Make rawUpdates Rx.Observers not unsubscribe when accessed from CordaServices - Do not allow rawUpdates subscribing from flows

This commit is contained in:
Kyriakos Tharrouniatis 2020-02-13 13:54:40 +00:00
parent f27e0641b4
commit 97ec3a18b1
8 changed files with 689 additions and 116 deletions

View File

@ -26,6 +26,19 @@ class FlowSafeSubject<T, R>(private val actual: Subject<T, R>) : Observer<T> by
}
}) {
override fun hasObservers(): Boolean {
return actual.hasObservers()
}
}
/**
* The [PreventSubscriptionsSubject] is used to prevent any subscriptions to a [Subject].
*/
class PreventSubscriptionsSubject<T, R>(private val actual: Subject<T, R>, errorAction: () -> Unit) : Observer<T> by actual,
Subject<T, R>(OnSubscribe<R> { _ ->
errorAction()
}) {
override fun hasObservers(): Boolean {
return actual.hasObservers()
}

View File

@ -1,34 +1,47 @@
package net.corda.node.services.vault
import co.paralleluniverse.strands.concurrent.Semaphore
import com.r3.dbfailure.contracts.DbFailureContract
import com.r3.dbfailure.workflows.CreateStateFlow
import com.r3.dbfailure.workflows.CreateStateFlow.Initiator
import com.r3.dbfailure.workflows.CreateStateFlow.errorTargetsToNum
import com.r3.dbfailure.workflows.DbListenerService
import com.r3.dbfailure.workflows.DbListenerService.MakeServiceThrowErrorFlow
import com.r3.dbfailure.workflows.SendStateFlow
import com.r3.transactionfailure.workflows.ErrorHandling
import com.r3.transactionfailure.workflows.ErrorHandling.CheckpointAfterErrorFlow
import net.corda.core.CordaRuntimeException
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.startFlow
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.Permissions
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.OutOfProcess
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import net.corda.testing.node.internal.findCordapp
import org.assertj.core.api.Assertions
import org.junit.After
import org.junit.Assert
import org.junit.Test
import java.lang.IllegalStateException
import java.sql.SQLException
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import javax.persistence.PersistenceException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@ -50,6 +63,8 @@ class VaultObserverExceptionTest {
StaffedFlowHospital.onFlowAdmitted.clear()
DbListenerService.onError = null
DbListenerService.safeSubscription = true
DbListenerService.onNextVisited = {}
DbListenerService.onErrorVisited = null
}
/**
@ -75,9 +90,9 @@ class VaultObserverExceptionTest {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(
::Initiator,
"Syntax Error in Custom SQL",
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError)
CreateStateFlow::Initiator,
"Syntax Error in Custom SQL",
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError)
).returnValue.then { testControlFuture.complete(false) }
val foundExpectedException = testControlFuture.getOrThrow(30.seconds)
@ -109,7 +124,7 @@ class VaultObserverExceptionTest {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(
::Initiator,
CreateStateFlow::Initiator,
"Syntax Error in Custom SQL",
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError)
).returnValue.then { testControlFuture.complete(false) }
@ -138,7 +153,7 @@ class VaultObserverExceptionTest {
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
@ -166,9 +181,9 @@ class VaultObserverExceptionTest {
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
}
@ -202,7 +217,7 @@ class VaultObserverExceptionTest {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
assertFailsWith<TimeoutException>("PersistenceException") {
aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum(
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", errorTargetsToNum(
CreateStateFlow.ErrorTarget.TxInvalidState))
.returnValue.getOrThrow(30.seconds)
}
@ -235,7 +250,7 @@ class VaultObserverExceptionTest {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
val flowHandle = aliceNode.rpc.startFlow(
::Initiator, "EntityManager",
CreateStateFlow::Initiator, "EntityManager",
CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.TxInvalidState,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
@ -263,7 +278,7 @@ class VaultObserverExceptionTest {
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
val flowHandle = aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
val flowResult = flowHandle.returnValue
@ -286,7 +301,7 @@ class VaultObserverExceptionTest {
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
val flowHandle = aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError,
CreateStateFlow.ErrorTarget.ServiceSwallowErrors))
val flowResult = flowHandle.returnValue
@ -316,20 +331,20 @@ class VaultObserverExceptionTest {
}
driver(DriverParameters(
inMemoryDB = false,
startNodesInProcess = true,
isDebug = true,
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.transactionfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")))) {
inMemoryDB = false,
startNodesInProcess = true,
isDebug = true,
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.transactionfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")))) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
node.rpc.startFlow(::CheckpointAfterErrorFlow, CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions, // throw not persistence exception
CreateStateFlow.ErrorTarget.FlowSwallowErrors
)
)
)
waitUntilHospitalised.acquire()
@ -364,9 +379,9 @@ class VaultObserverExceptionTest {
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
}
@ -379,7 +394,7 @@ class VaultObserverExceptionTest {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser), startInSameProcess = false).getOrThrow()
aliceNode.rpc.startFlow(::MakeServiceThrowErrorFlow).returnValue.getOrThrow()
aliceNode.rpc.startFlow(::Initiator, "UnrecoverableError", CreateStateFlow.errorTargetsToNum(
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "UnrecoverableError", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError))
val terminated = (aliceNode as OutOfProcess).process.waitFor(30, TimeUnit.SECONDS)
@ -398,4 +413,369 @@ class VaultObserverExceptionTest {
}
}
/**
* An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the initiating node.
*
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
* also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
* counterparty node.
*
* More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe.
*
* Check onNext is visited the correct number of times.
*
* This test causes 2 failures inside of the observer to ensure that the observer is still subscribed.
*/
@Test
fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer - onNext check`() {
var observationCounter = 0
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
val rawUpdatesCount = ConcurrentHashMap<Party, Int>()
DbListenerService.onNextVisited = { party ->
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
}
}
val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf(
findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")
),inMemoryDB = false)
) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
val startErrorInObservableWhenConsumingState = {
val stateId = aliceNode.rpc.startFlow(
CreateStateFlow::Initiator,
"AllGood",
errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed)
).returnValue.getOrThrow(30.seconds)
println("Created new state")
val flowHandle = aliceNode.rpc.startFlow(
SendStateFlow::PassErroneousOwnableState, // throws at consumed state -> should end up in hospital -> flow should hang
stateId,
errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError),
bobNode.nodeInfo.legalIdentities.first()
)
Assertions.assertThatExceptionOfType(TimeoutException::class.java)
.isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) }
stateId
}
assertEquals(0, notary.getNotarisedTransactionIds().size)
println("First set of flows")
val stateId = startErrorInObservableWhenConsumingState()
assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
assertEquals(1, notary.getNotarisedTransactionIds().size)
assertEquals(1, observationCounter)
assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
println("Second set of flows")
val stateId2 = startErrorInObservableWhenConsumingState()
assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
assertEquals(2, notary.getNotarisedTransactionIds().size)
assertEquals(2, observationCounter)
assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
}
}
/**
* An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the initiating node.
*
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
* also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
* counterparty node.
*
* More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe.
*
* Check onNext and onError are visited the correct number of times.
*
* This test causes 2 failures inside of the observer to ensure that the observer is still subscribed.
*/
@Test
fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer - onNext and onError check`() {
var observationCounter = 0
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
val rawUpdatesCount = ConcurrentHashMap<Party, Int>()
DbListenerService.onNextVisited = { party ->
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
}
}
DbListenerService.onError = {/*just rethrow - we just want to check that onError gets visited by parties*/ throw it}
DbListenerService.onErrorVisited = { party ->
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
}
}
val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf(
findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")
),
inMemoryDB = false)
) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
val startErrorInObservableWhenConsumingState = {
val stateId = aliceNode.rpc.startFlow(
CreateStateFlow::Initiator,
"AllGood",
// should be a hospital exception
errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed)
).returnValue.getOrThrow(30.seconds)
val flowHandle = aliceNode.rpc.startFlow(
SendStateFlow::PassErroneousOwnableState,
stateId,
errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError),
bobNode.nodeInfo.legalIdentities.first()
)
Assertions.assertThatExceptionOfType(TimeoutException::class.java)
.isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) }
stateId
}
assertEquals(0, notary.getNotarisedTransactionIds().size)
val stateId = startErrorInObservableWhenConsumingState()
assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
assertEquals(1, notary.getNotarisedTransactionIds().size)
assertEquals(1, observationCounter)
assertEquals(3, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
val stateId2 = startErrorInObservableWhenConsumingState()
assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
assertEquals(2, notary.getNotarisedTransactionIds().size)
assertEquals(2, observationCounter)
assertEquals(6, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
}
}
/**
* An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the counterparty node.
*
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent.
* Observer events are recorded on both the initiating node and the counterparty node.
*
* More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe.
*
* This test causes 2 failures inside of the observer to ensure that the observer is still subscribed.
*/
@Test
fun `Throw user error in VaultService rawUpdates during counterparty FinalityFlow blows up the flow but does not break the Observer`() {
var observationCounter = 0
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
val rawUpdatesCount = ConcurrentHashMap<Party, Int>()
DbListenerService.onNextVisited = { party ->
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
}
}
val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf(
findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")
),
inMemoryDB = false)
) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
val startErrorInObservableWhenCreatingSecondState = {
val stateId = aliceNode.rpc.startFlow(
CreateStateFlow::Initiator,
"AllGood",
errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError)
).returnValue.getOrThrow(30.seconds)
aliceNode.rpc.startFlow(
SendStateFlow::PassErroneousOwnableState,
stateId,
errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError),
bobNode.nodeInfo.legalIdentities.first()
).returnValue.getOrThrow(20.seconds)
stateId
}
assertEquals(0, notary.getNotarisedTransactionIds().size)
val stateId = startErrorInObservableWhenCreatingSecondState()
assertEquals(1, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
assertEquals(1, notary.getNotarisedTransactionIds().size)
assertEquals(1, observationCounter)
assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(1, rawUpdatesCount[bobNode.nodeInfo.singleIdentity()])
val stateId2 = startErrorInObservableWhenCreatingSecondState()
assertEquals(1, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
assertEquals(2, aliceNode.getAllStates(Vault.StateStatus.CONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
assertEquals(2, notary.getNotarisedTransactionIds().size)
assertEquals(2, observationCounter)
assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(2, rawUpdatesCount[bobNode.nodeInfo.singleIdentity()])
}
}
/**
* An error is thrown inside of the [VaultService.updates] observable while recording a transaction inside of the initiating node.
*
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
* also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
* counterparty node.
*
* More importantly, the observer listening to the [VaultService.updates] observable should not unsubscribe.
*
* This test causes 2 failures inside of the [rx.Observer] to ensure that the Observer is still subscribed.
*/
@Test
fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer`() {
var observationCounter = 0
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
val rawUpdatesCount = ConcurrentHashMap<Party, Int>()
DbListenerService.onNextVisited = { party ->
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
}
}
val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf(
findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")
),
inMemoryDB = false)
) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
val startErrorInObservableWhenConsumingState = {
val stateId = aliceNode.rpc.startFlow(
CreateStateFlow::Initiator,
"AllGood",
errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed)
).returnValue.getOrThrow(30.seconds)
val flowHandle = aliceNode.rpc.startFlow(
SendStateFlow::PassErroneousOwnableState,
stateId,
errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError),
bobNode.nodeInfo.legalIdentities.first()
)
Assertions.assertThatExceptionOfType(TimeoutException::class.java)
.isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) }
stateId
}
assertEquals(0, notary.getNotarisedTransactionIds().size)
val stateId = startErrorInObservableWhenConsumingState()
assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
assertEquals(1, aliceNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
assertEquals(1, notary.getNotarisedTransactionIds().size)
assertEquals(1, observationCounter)
assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
val stateId2 = startErrorInObservableWhenConsumingState()
assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
assertEquals(2, aliceNode.getAllStates(Vault.StateStatus.UNCONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
assertEquals(2, notary.getNotarisedTransactionIds().size)
assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
}
}
@Test
fun `Subscribing to NodeVaultService rawUpdates from a flow is not allowed` () {
val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf(
findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")
),
inMemoryDB = false)
) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val future = aliceNode.rpc.startFlow(ErrorHandling::SubscribingRawUpdatesFlow).returnValue
assertFailsWith<CordaRuntimeException>("Cannot subscribe to NodeVaultService.rawUpdates from a flow!") {
future.getOrThrow(30.seconds)
}
}
}
//TODO add retry from checkpoint test
@Test
fun `Failing Observer wrapped with FlowSafeSubscriber will remain and re-called upon flow retry`() {
}
private fun NodeHandle.getNotarisedTransactionIds(): List<String> {
return rpc.startFlowDynamic(SendStateFlow.NotarisedTxs::class.java).returnValue.getOrThrow()
}
private fun NodeHandle.getStatesById(id: UniqueIdentifier?, status: Vault.StateStatus): List<StateAndRef<DbFailureContract.TestState>> {
return rpc.vaultQueryByCriteria(
QueryCriteria.LinearStateQueryCriteria(
linearId = if (id != null) listOf(id) else null,
status = status
), DbFailureContract.TestState::class.java
).states
}
private fun NodeHandle.getAllStates(status: Vault.StateStatus): List<StateAndRef<DbFailureContract.TestState>> {
return getStatesById(null, status)
}
}

View File

@ -5,6 +5,7 @@ import co.paralleluniverse.strands.Strand
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.containsAny
import net.corda.core.flows.FlowException
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.internal.*
import net.corda.core.messaging.DataFeed
@ -209,7 +210,24 @@ class NodeVaultService(
}
override val rawUpdates: Observable<Vault.Update<ContractState>>
get() = mutex.locked { _rawUpdatesPublisher }
get() = mutex.locked {
if (FlowStateMachineImpl.currentStateMachine() != null) {
// we are inside a flow! we cannot allow flows to subscribe observers,
// because if a flow adds a subscriber; if the observer under the subscriber holds references to
// flow's properties, essentially to fiber's properties then, since it does not unsubscribes on flow's/ fiber's completion,
// it could prevent the flow/ fiber swapped our of memory.
PreventSubscriptionsSubject(_rawUpdatesPublisher) {
log.error("Cannot subscribe to NodeVaultService.rawUpdates from a flow! " +
"- hospitalising the flow ")
throw FlowException("Cannot subscribe to NodeVaultService.rawUpdates from a flow! ")
}
} else {
// we are not inside a flow; we are most likely inside a CordaService,
// we will wrap with 'safe subscriptions' here, namely add -not unsubscribing- subscribers.
FlowSafeSubject(_rawUpdatesPublisher)
}
}
override val updates: Observable<Vault.Update<ContractState>>
get() = mutex.locked { _updatesInDbTx }

View File

@ -1,12 +1,13 @@
package com.r3.dbfailure.contracts
import com.r3.dbfailure.schemas.DbFailureSchemaV1
import net.corda.core.contracts.CommandAndState
import net.corda.core.contracts.CommandData
import net.corda.core.contracts.Contract
import net.corda.core.contracts.LinearState
import net.corda.core.contracts.OwnableState
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState
@ -21,23 +22,31 @@ class DbFailureContract : Contract {
class TestState(
override val linearId: UniqueIdentifier,
val particpant: Party,
override val participants: List<AbstractParty>,
val randomValue: String?,
val errorTarget: Int = 0
) : LinearState, QueryableState {
val errorTarget: Int = 0,
override val owner: AbstractParty
) : LinearState, QueryableState, OwnableState {
override val participants: List<AbstractParty> = listOf(particpant)
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(DbFailureSchemaV1)
override fun generateMappedObject(schema: MappedSchema): PersistentState {
return if (schema is DbFailureSchemaV1){
DbFailureSchemaV1.PersistentTestState( particpant.name.toString(), randomValue, errorTarget, linearId.id)
DbFailureSchemaV1.PersistentTestState( participants.toString(), randomValue, errorTarget, linearId.id)
}
else {
throw IllegalArgumentException("Unsupported schema $schema")
}
}
override fun withNewOwner(newOwner: AbstractParty): CommandAndState {
return CommandAndState(Commands.Send(), TestState(this.linearId, this.participants.plus(newOwner).toSet().toList(), this.randomValue, this.errorTarget, newOwner))
}
fun withNewOwnerAndErrorTarget(newOwner: AbstractParty, errorTarget: Int): CommandAndState {
return CommandAndState(Commands.Send(), TestState(this.linearId, this.participants.plus(newOwner).toSet().toList(), this.randomValue, errorTarget, newOwner))
}
}
override fun verify(tx: LedgerTransaction) {
@ -46,5 +55,6 @@ class DbFailureContract : Contract {
interface Commands : CommandData{
class Create: Commands
class Send : Commands
}
}

View File

@ -28,6 +28,7 @@ object CreateStateFlow {
ServiceThrowInvalidParameter(6),
ServiceThrowMotherOfAllExceptions(7),
ServiceThrowUnrecoverableError(8),
ServiceSqlSyntaxErrorOnConsumed(9),
TxInvalidState(10),
FlowSwallowErrors(100),
ServiceSwallowErrors(1000)
@ -69,10 +70,11 @@ object CreateStateFlow {
val txTarget = getTxTarget(errorTarget)
logger.info("Test flow: The tx error target is $txTarget")
val state = DbFailureContract.TestState(
UniqueIdentifier(),
ourIdentity,
if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue,
errorTarget)
UniqueIdentifier(),
listOf(ourIdentity),
if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue,
errorTarget, ourIdentity
)
val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey)
logger.info("Test flow: tx builder")

View File

@ -4,12 +4,14 @@ import com.r3.dbfailure.contracts.DbFailureContract
import net.corda.core.contracts.ContractState
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.node.services.Vault
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import rx.observers.Subscribers
import java.lang.IllegalStateException
import java.security.InvalidParameterException
@CordaService
@ -17,22 +19,120 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
companion object {
val log = contextLogger()
var onError: ((Throwable) -> Unit)? = null
// make the service throw an unrecoverable error (should be executed in an outOfProcess node so that it wont halt testing jvm)
var throwUnrecoverableError = false
var safeSubscription = true
var onNextVisited: (Party) -> Unit = {}
var onErrorVisited: ((Party) -> Unit)? = null
}
init {
val onNext: (Vault.Update<ContractState>) -> Unit =
{ (_, produced) ->
produced.forEach {
val contractState = it.state.data as? DbFailureContract.TestState
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions
try {
{ (consumed, produced) ->
onNextVisited(services.myInfo.legalIdentities.first())
produced.forEach {
val contractState = it.state.data as? DbFailureContract.TestState
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions
try {
when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) {
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> {
log.info("Fail with syntax error on raw statement")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"UPDATE FAIL_TEST_STATES \n" +
"BLAAA RANDOM_VALUE = NULL\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation -> {
log.info("Fail with null constraint violation on raw statement")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"UPDATE FAIL_TEST_STATES \n" +
"SET RANDOM_VALUE = NULL\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceValidUpdate -> {
log.info("Update current statement")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"UPDATE FAIL_TEST_STATES \n" +
"SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceReadState -> {
log.info("Read current state from db")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"SELECT * FROM FAIL_TEST_STATES \n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceCheckForState -> {
log.info("Check for currently written state in the db")
val session = services.jdbcSession()
val statement = session.createStatement()
val rs = statement.executeQuery(
"SELECT COUNT(*) FROM FAIL_TEST_STATES \n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0
log.info(
"Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " +
"TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}"
)
}
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> {
log.info("Throw InvalidParameterException")
throw InvalidParameterException("Toys out of pram")
}
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions -> {
log.info("Throw Exception")
throw Exception("Mother of all exceptions")
}
CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError -> {
// this bit of code should only work in a OutOfProcess node,
// otherwise it will kill the testing jvm (including the testing thread)
if (throwUnrecoverableError) {
log.info("Throw Unrecoverable error")
throw OutOfMemoryError("Unrecoverable error")
}
}
else -> {
// do nothing, everything else must be handled elsewhere
}
}
} catch (t: Throwable) {
if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget)
== CreateStateFlow.ErrorTarget.ServiceSwallowErrors
) {
log.warn("Service not letting errors escape", t)
} else {
throw t
}
}
}
consumed.forEach {
val contractState = it.state.data as? DbFailureContract.TestState
log.info("Test Service: Got state ${if (contractState == null) "null" else " test state with error target ${contractState.errorTarget}"}")
when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) {
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> {
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed -> {
log.info("Fail with syntax error on raw statement")
val session = services.jdbcSession()
val statement = session.createStatement()
@ -43,83 +143,20 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation -> {
log.info("Fail with null constraint violation on raw statement")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"UPDATE FAIL_TEST_STATES \n" +
"SET RANDOM_VALUE = NULL\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceValidUpdate -> {
log.info("Update current statement")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"UPDATE FAIL_TEST_STATES \n" +
"SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceReadState -> {
log.info("Read current state from db")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"SELECT * FROM FAIL_TEST_STATES \n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceCheckForState -> {
log.info("Check for currently written state in the db")
val session = services.jdbcSession()
val statement = session.createStatement()
val rs = statement.executeQuery(
"SELECT COUNT(*) FROM FAIL_TEST_STATES \n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0
log.info("Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " +
"TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}")
}
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> {
log.info("Throw InvalidParameterException")
throw InvalidParameterException("Toys out of pram")
}
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions -> {
log.info("Throw Exception")
throw Exception("Mother of all exceptions")
}
CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError -> {
// this bit of code should only work in a OutOfProcess node,
// otherwise it will kill the testing jvm (including the testing thread)
if (throwUnrecoverableError) {
log.info("Throw Unrecoverable error")
throw OutOfMemoryError("Unrecoverable error")
}
}
else -> {
// do nothing, everything else must be handled elsewhere
}
}
} catch (t: Throwable) {
if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget)
== CreateStateFlow.ErrorTarget.ServiceSwallowErrors) {
log.warn("Service not letting errors escape", t)
} else {
throw t
}
}
}
}
if (onError != null) {
services.vaultService.rawUpdates.subscribe(onNext, onError) // onError is defined
val onErrorWrapper: ((Throwable) -> Unit)? = {
onErrorVisited?.let {
it(services.myInfo.legalIdentities.first())
}
onError!!(it)
}
services.vaultService.rawUpdates.subscribe(onNext, onErrorWrapper) // onError is defined
} else if (onErrorVisited != null) {
throw IllegalStateException("A DbListenerService.onError needs to be defined!")
} else {
if (safeSubscription) {
services.vaultService.rawUpdates.subscribe(onNext)

View File

@ -0,0 +1,101 @@
package com.r3.dbfailure.workflows
import co.paralleluniverse.fibers.Suspendable
import com.r3.dbfailure.contracts.DbFailureContract
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.unwrap
object SendStateFlow {
/**
* Creates a [DbFailureContract.TestState], signs it, collects a signature from a separate node and then calls [FinalityFlow] flow.
* Can throw in various stages
*/
@StartableByRPC
@InitiatingFlow
class PassErroneousOwnableState(private val stateId: UniqueIdentifier, private val errorTarget: Int, private val counterParty: Party) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
logger.info("Test flow: starting")
val notary = serviceHub.networkMapCache.notaryIdentities[0]
logger.info("Test flow: create counterparty session")
val recipientSession = initiateFlow(counterParty)
val queryCriteria = QueryCriteria.LinearStateQueryCriteria(linearId = listOf(stateId), status = Vault.StateStatus.UNCONSUMED)
val inputState = serviceHub.vaultService.queryBy(DbFailureContract.TestState::class.java, queryCriteria).states.singleOrNull()
?: throw FlowException("Failed to find single state for linear id $stateId")
logger.info("Test flow: tx builder")
val commandAndState = inputState.state.data.withNewOwnerAndErrorTarget(counterParty, errorTarget)
val txBuilder = TransactionBuilder(notary)
.addInputState(inputState)
.addOutputState(commandAndState.ownableState)
.addCommand(commandAndState.command, listOf(ourIdentity.owningKey, counterParty.owningKey))
logger.info("Test flow: verify")
txBuilder.verify(serviceHub)
val signedTx = serviceHub.signInitialTransaction(txBuilder)
logger.info("Test flow: send for counterparty signing")
recipientSession.send(signedTx)
logger.info("Test flow: Waiting to receive counter signed transaction")
val counterSignedTx = recipientSession.receive<SignedTransaction>().unwrap { it }
logger.info("Test flow: Received counter sigend transaction, invoking finality")
subFlow(FinalityFlow(counterSignedTx, recipientSession))
logger.info("Test flow: Finishing")
}
}
@InitiatedBy(PassErroneousOwnableState::class)
class PassErroneousOwnableStateReceiver(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
logger.info("Test flow counterparty: starting")
val signedTx = otherSide.receive<SignedTransaction>().unwrap { it }
logger.info("Test flow counterparty: received TX, signing")
val counterSignedTx = serviceHub.addSignature(signedTx)
logger.info("Test flow counterparty: calling hookBeforeCounterPartyAnswers")
logger.info("Test flow counterparty: Answer with countersigned transaction")
otherSide.send(counterSignedTx)
logger.info("Test flow counterparty: calling hookAfterCounterPartyAnswers")
// Not ideal that we have to do this check, but we must as FinalityFlow does not send locally
if (!serviceHub.myInfo.isLegalIdentity(otherSide.counterparty)) {
logger.info("Test flow counterparty: Waiting for finality")
subFlow(ReceiveFinalityFlow(otherSide))
}
logger.info("Test flow counterparty: Finishing")
}
}
@StartableByRPC
class NotarisedTxs : FlowLogic<List<String>>() {
override fun call(): List<String> {
val session = serviceHub.jdbcSession()
val statement = session.createStatement()
statement.execute("SELECT TRANSACTION_ID FROM NODE_NOTARY_COMMITTED_TXS;")
val result = mutableListOf<String>()
while (statement.resultSet.next()) {
result.add(statement.resultSet.getString(1))
}
return result
}
}
}

View File

@ -30,9 +30,10 @@ object ErrorHandling {
val txTarget = CreateStateFlow.getTxTarget(errorTarget)
val state = DbFailureContract.TestState(
UniqueIdentifier(),
ourIdentity,
listOf(ourIdentity),
if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else "valid hibernate value",
errorTarget)
errorTarget,
ourIdentity)
val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey)
val txBuilder = TransactionBuilder(notary).addOutputState(state).addCommand(txCommand)
val signedTx = serviceHub.signInitialTransaction(txBuilder)
@ -51,4 +52,15 @@ object ErrorHandling {
}
}
@StartableByRPC
class SubscribingRawUpdatesFlow: FlowLogic<Unit>() {
override fun call() {
val rawUpdates = serviceHub.vaultService.rawUpdates
logger.info("Accessing rawUpdates in a flow is fine! ")
rawUpdates.subscribe {
println("However, adding a subscription will make the flow fail!")
}
}
}
}