diff --git a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt index 55e2cc5e94..77f913b5cf 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt @@ -9,9 +9,9 @@ import com.r3.dbfailure.workflows.DbListenerService.MakeServiceThrowErrorFlow import com.r3.dbfailure.workflows.SendStateFlow import com.r3.transactionfailure.workflows.ErrorHandling import com.r3.transactionfailure.workflows.ErrorHandling.CheckpointAfterErrorFlow -import net.corda.core.CordaRuntimeException import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.UniqueIdentifier +import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic import net.corda.core.flows.StartableByRPC import net.corda.core.identity.Party @@ -737,7 +737,7 @@ class VaultObserverExceptionTest { } @Test - fun `Subscribing to NodeVaultService rawUpdates from a flow is not allowed` () { + fun `Accessing NodeVaultService rawUpdates from a flow is not allowed` () { val user = User("user", "foo", setOf(Permissions.all())) driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf( @@ -751,10 +751,9 @@ class VaultObserverExceptionTest { val flowHandle = aliceNode.rpc.startFlow(::SubscribingRawUpdatesFlow) - assertFailsWith( - "Flow ${SubscribingRawUpdatesFlow::class.java.name} tried to subscribe an Rx.Observer to VaultService.rawUpdates " + - "- Rx.Observables should only be subscribed outside the context of a flow " + - "- the subscription did not succeed " + assertFailsWith( + "Flow ${SubscribingRawUpdatesFlow::class.java.name} tried to access VaultService.rawUpdates " + + "- Rx.Observables should only be accessed to outside the context of a flow " ) { flowHandle.returnValue.getOrThrow(30.seconds) } @@ -830,10 +829,11 @@ class VaultObserverExceptionTest { @StartableByRPC class SubscribingRawUpdatesFlow: FlowLogic() { override fun call() { - val rawUpdates = serviceHub.vaultService.rawUpdates - logger.info("Accessing rawUpdates in a flow is fine! ") + logger.info("Accessing rawUpdates within a flow will throw! ") + val rawUpdates = serviceHub.vaultService.rawUpdates // throws + logger.info("Code flow should never reach this logging or the following segment! ") rawUpdates.subscribe { - println("However, adding a subscription will make the flow fail!") + println("Code flow should never get in here!") } } } diff --git a/node/src/main/kotlin/net/corda/node/internal/FlowSafeSubject.kt b/node/src/main/kotlin/net/corda/node/internal/FlowSafeSubject.kt index 7f1756c377..3880e5bce6 100644 --- a/node/src/main/kotlin/net/corda/node/internal/FlowSafeSubject.kt +++ b/node/src/main/kotlin/net/corda/node/internal/FlowSafeSubject.kt @@ -32,19 +32,6 @@ class FlowSafeSubject(private val actual: Subject) : Observer by } }) { - override fun hasObservers(): Boolean { - return actual.hasObservers() - } -} - -/** - * The [PreventSubscriptionsSubject] is used to prevent any subscriptions to its underlying [Subject]. - */ -class PreventSubscriptionsSubject(private val actual: Subject, errorAction: () -> Exception) : Observer by actual, - Subject(OnSubscribe { _ -> - throw errorAction() - }) { - override fun hasObservers(): Boolean { return actual.hasObservers() } diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 9b859c3e47..e5526b283c 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -19,7 +19,6 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.* import net.corda.core.utilities.* import net.corda.node.internal.FlowSafeSubject -import net.corda.node.internal.PreventSubscriptionsSubject import net.corda.node.services.api.SchemaService import net.corda.node.services.api.VaultServiceInternal import net.corda.node.services.schema.PersistentStateService @@ -218,20 +217,16 @@ class NodeVaultService( // because the Observer could reference flow's properties, essentially fiber's properties then, // since it does not unsubscribe on flow's/ fiber's completion, // it could prevent the flow/ fiber -object- get garbage collected. - return PreventSubscriptionsSubject(_rawUpdatesPublisher) { - log.error( - "Flow ${it.logic::class.java.name} tried to subscribe an Rx.Observer to VaultService.rawUpdates " + - "- Rx.Observables should only be subscribed to outside the context of a flow " + - "- the subscription did not succeed " + - "- aborting the flow " - ) + log.error( + "Flow ${it.logic::class.java.name} tried to access VaultService.rawUpdates " + + "- Rx.Observables should only be accessed to outside the context of a flow " + + "- aborting the flow " + ) - FlowException( - "Flow ${it.logic::class.java.name} tried to subscribe an Rx.Observer to VaultService.rawUpdates " + - "- Rx.Observables should only be subscribed to outside the context of a flow " + - "- the subscription did not succeed " - ) - } + throw FlowException( + "Flow ${it.logic::class.java.name} tried to access VaultService.rawUpdates " + + "- Rx.Observables should only be accessed to outside the context of a flow " + ) } // we are not inside a flow, we are most likely inside a CordaService; // we will expose, by default, subscribing of -non unsubscribing- rx.Observers to rawUpdates.