Throw exception upon accessing VaultService.rawUpdates from within a flow

This commit is contained in:
Kyriakos Tharrouniatis 2020-02-17 13:50:43 +00:00
parent 3000d58022
commit b5682e3a4e
3 changed files with 18 additions and 36 deletions

View File

@ -9,9 +9,9 @@ import com.r3.dbfailure.workflows.DbListenerService.MakeServiceThrowErrorFlow
import com.r3.dbfailure.workflows.SendStateFlow import com.r3.dbfailure.workflows.SendStateFlow
import com.r3.transactionfailure.workflows.ErrorHandling import com.r3.transactionfailure.workflows.ErrorHandling
import com.r3.transactionfailure.workflows.ErrorHandling.CheckpointAfterErrorFlow import com.r3.transactionfailure.workflows.ErrorHandling.CheckpointAfterErrorFlow
import net.corda.core.CordaRuntimeException
import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.UniqueIdentifier import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party import net.corda.core.identity.Party
@ -737,7 +737,7 @@ class VaultObserverExceptionTest {
} }
@Test @Test
fun `Subscribing to NodeVaultService rawUpdates from a flow is not allowed` () { fun `Accessing NodeVaultService rawUpdates from a flow is not allowed` () {
val user = User("user", "foo", setOf(Permissions.all())) val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true, driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf( cordappsForAllNodes = listOf(
@ -751,10 +751,9 @@ class VaultObserverExceptionTest {
val flowHandle = aliceNode.rpc.startFlow(::SubscribingRawUpdatesFlow) val flowHandle = aliceNode.rpc.startFlow(::SubscribingRawUpdatesFlow)
assertFailsWith<CordaRuntimeException>( assertFailsWith<FlowException>(
"Flow ${SubscribingRawUpdatesFlow::class.java.name} tried to subscribe an Rx.Observer to VaultService.rawUpdates " + "Flow ${SubscribingRawUpdatesFlow::class.java.name} tried to access VaultService.rawUpdates " +
"- Rx.Observables should only be subscribed outside the context of a flow " + "- Rx.Observables should only be accessed to outside the context of a flow "
"- the subscription did not succeed "
) { ) {
flowHandle.returnValue.getOrThrow(30.seconds) flowHandle.returnValue.getOrThrow(30.seconds)
} }
@ -830,10 +829,11 @@ class VaultObserverExceptionTest {
@StartableByRPC @StartableByRPC
class SubscribingRawUpdatesFlow: FlowLogic<Unit>() { class SubscribingRawUpdatesFlow: FlowLogic<Unit>() {
override fun call() { override fun call() {
val rawUpdates = serviceHub.vaultService.rawUpdates logger.info("Accessing rawUpdates within a flow will throw! ")
logger.info("Accessing rawUpdates in a flow is fine! ") val rawUpdates = serviceHub.vaultService.rawUpdates // throws
logger.info("Code flow should never reach this logging or the following segment! ")
rawUpdates.subscribe { rawUpdates.subscribe {
println("However, adding a subscription will make the flow fail!") println("Code flow should never get in here!")
} }
} }
} }

View File

@ -36,16 +36,3 @@ class FlowSafeSubject<T, R>(private val actual: Subject<T, R>) : Observer<T> by
return actual.hasObservers() return actual.hasObservers()
} }
} }
/**
* The [PreventSubscriptionsSubject] is used to prevent any subscriptions to its underlying [Subject].
*/
class PreventSubscriptionsSubject<T, R>(private val actual: Subject<T, R>, errorAction: () -> Exception) : Observer<T> by actual,
Subject<T, R>(OnSubscribe<R> { _ ->
throw errorAction()
}) {
override fun hasObservers(): Boolean {
return actual.hasObservers()
}
}

View File

@ -19,7 +19,6 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.* import net.corda.core.transactions.*
import net.corda.core.utilities.* import net.corda.core.utilities.*
import net.corda.node.internal.FlowSafeSubject import net.corda.node.internal.FlowSafeSubject
import net.corda.node.internal.PreventSubscriptionsSubject
import net.corda.node.services.api.SchemaService import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.VaultServiceInternal import net.corda.node.services.api.VaultServiceInternal
import net.corda.node.services.schema.PersistentStateService import net.corda.node.services.schema.PersistentStateService
@ -218,21 +217,17 @@ class NodeVaultService(
// because the Observer could reference flow's properties, essentially fiber's properties then, // because the Observer could reference flow's properties, essentially fiber's properties then,
// since it does not unsubscribe on flow's/ fiber's completion, // since it does not unsubscribe on flow's/ fiber's completion,
// it could prevent the flow/ fiber -object- get garbage collected. // it could prevent the flow/ fiber -object- get garbage collected.
return PreventSubscriptionsSubject(_rawUpdatesPublisher) {
log.error( log.error(
"Flow ${it.logic::class.java.name} tried to subscribe an Rx.Observer to VaultService.rawUpdates " + "Flow ${it.logic::class.java.name} tried to access VaultService.rawUpdates " +
"- Rx.Observables should only be subscribed to outside the context of a flow " + "- Rx.Observables should only be accessed to outside the context of a flow " +
"- the subscription did not succeed " +
"- aborting the flow " "- aborting the flow "
) )
FlowException( throw FlowException(
"Flow ${it.logic::class.java.name} tried to subscribe an Rx.Observer to VaultService.rawUpdates " + "Flow ${it.logic::class.java.name} tried to access VaultService.rawUpdates " +
"- Rx.Observables should only be subscribed to outside the context of a flow " + "- Rx.Observables should only be accessed to outside the context of a flow "
"- the subscription did not succeed "
) )
} }
}
// we are not inside a flow, we are most likely inside a CordaService; // we are not inside a flow, we are most likely inside a CordaService;
// we will expose, by default, subscribing of -non unsubscribing- rx.Observers to rawUpdates. // we will expose, by default, subscribing of -non unsubscribing- rx.Observers to rawUpdates.
return FlowSafeSubject(_rawUpdatesPublisher) return FlowSafeSubject(_rawUpdatesPublisher)