From bc79ce80580fc881eec641542e9facdb81eb76ea Mon Sep 17 00:00:00 2001 From: Kyriakos Tharrouniatis Date: Thu, 13 Feb 2020 16:27:16 +0000 Subject: [PATCH] Add fiber's id to log message and exception message --- .../vault/VaultObserverExceptionTest.kt | 6 ++-- .../node/services/vault/NodeVaultService.kt | 34 +++++++++++-------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt index c88147d684..7caa55a9d1 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt @@ -748,10 +748,10 @@ class VaultObserverExceptionTest { ) { val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() - val future = aliceNode.rpc.startFlow(ErrorHandling::SubscribingRawUpdatesFlow).returnValue + val flowHandle = aliceNode.rpc.startFlow(ErrorHandling::SubscribingRawUpdatesFlow) - assertFailsWith("Flow tried to subscribe an Rx.Observer to VaultService.rawUpdates - the subscription did not succeed ") { - future.getOrThrow(30.seconds) + assertFailsWith("Flow ${flowHandle.id} tried to subscribe an Rx.Observer to VaultService.rawUpdates - the subscription did not succeed ") { + flowHandle.returnValue.getOrThrow(30.seconds) } } } diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 486f3a45b1..e3f753b613 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -211,24 +211,28 @@ class NodeVaultService( override val rawUpdates: Observable> get() = mutex.locked { - if (FlowStateMachineImpl.currentStateMachine() != null) { - // we are inside a flow! we cannot allow flows to subscribe observers, - // because if a flow adds a subscriber; if the observer under the subscriber holds references to - // flow's properties, essentially to fiber's properties then, since it does not unsubscribes on flow's/ fiber's completion, - // it could prevent the flow/ fiber swapped our of memory. - PreventSubscriptionsSubject(_rawUpdatesPublisher) { - log.error("Flow tried to subscribe an Rx.Observer to VaultService.rawUpdates " + - "- the subscription did not succeed " + - "- aborting the flow ") + FlowStateMachineImpl.currentStateMachine()?.let { + // we are inside a flow; we cannot allow flows to subscribe Rx Observers, + // because the Observer could reference flow's properties, essentially fiber's properties then, + // since it does not unsubscribe on flow's/ fiber's completion, + // it could prevent the flow/ fiber -object- get garbage collected. + return PreventSubscriptionsSubject(_rawUpdatesPublisher) { + log.error( + "Flow ${it.id} tried to subscribe an Rx.Observer to VaultService.rawUpdates " + + "- the subscription did not succeed " + + "- aborting the flow " + ) - throw FlowException("Flow tried to subscribe an Rx.Observer to VaultService.rawUpdates " + - "- the subscription did not succeed ") + throw FlowException( + "Flow ${it.id} tried to subscribe an Rx.Observer to VaultService.rawUpdates " + + "- the subscription did not succeed " + ) } - } else { - // we are not inside a flow; we are most likely inside a CordaService, - // we will wrap with 'safe subscriptions' here, namely add -not unsubscribing- subscribers. - FlowSafeSubject(_rawUpdatesPublisher) } + // we are not inside a flow, we are most likely inside a CordaService; + // we will wrap with 'flow safe subscriptions' here; + // every Observable.subscribe to this object should be wrapped with a FlowSafeSubscriber (-not unsubscribe- subscribers). + return FlowSafeSubject(_rawUpdatesPublisher) } override val updates: Observable>