Add fiber's id to log message and exception message

This commit is contained in:
Kyriakos Tharrouniatis 2020-02-13 16:27:16 +00:00
parent db8457ef3f
commit bc79ce8058
2 changed files with 22 additions and 18 deletions

View File

@ -748,10 +748,10 @@ class VaultObserverExceptionTest {
) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val future = aliceNode.rpc.startFlow(ErrorHandling::SubscribingRawUpdatesFlow).returnValue
val flowHandle = aliceNode.rpc.startFlow(ErrorHandling::SubscribingRawUpdatesFlow)
assertFailsWith<CordaRuntimeException>("Flow tried to subscribe an Rx.Observer to VaultService.rawUpdates - the subscription did not succeed ") {
future.getOrThrow(30.seconds)
assertFailsWith<CordaRuntimeException>("Flow ${flowHandle.id} tried to subscribe an Rx.Observer to VaultService.rawUpdates - the subscription did not succeed ") {
flowHandle.returnValue.getOrThrow(30.seconds)
}
}
}

View File

@ -211,24 +211,28 @@ class NodeVaultService(
override val rawUpdates: Observable<Vault.Update<ContractState>>
get() = mutex.locked {
if (FlowStateMachineImpl.currentStateMachine() != null) {
// we are inside a flow! we cannot allow flows to subscribe observers,
// because if a flow adds a subscriber; if the observer under the subscriber holds references to
// flow's properties, essentially to fiber's properties then, since it does not unsubscribes on flow's/ fiber's completion,
// it could prevent the flow/ fiber swapped our of memory.
PreventSubscriptionsSubject(_rawUpdatesPublisher) {
log.error("Flow tried to subscribe an Rx.Observer to VaultService.rawUpdates " +
"- the subscription did not succeed " +
"- aborting the flow ")
FlowStateMachineImpl.currentStateMachine()?.let {
// we are inside a flow; we cannot allow flows to subscribe Rx Observers,
// because the Observer could reference flow's properties, essentially fiber's properties then,
// since it does not unsubscribe on flow's/ fiber's completion,
// it could prevent the flow/ fiber -object- get garbage collected.
return PreventSubscriptionsSubject(_rawUpdatesPublisher) {
log.error(
"Flow ${it.id} tried to subscribe an Rx.Observer to VaultService.rawUpdates " +
"- the subscription did not succeed " +
"- aborting the flow "
)
throw FlowException("Flow tried to subscribe an Rx.Observer to VaultService.rawUpdates " +
"- the subscription did not succeed ")
throw FlowException(
"Flow ${it.id} tried to subscribe an Rx.Observer to VaultService.rawUpdates " +
"- the subscription did not succeed "
)
}
} else {
// we are not inside a flow; we are most likely inside a CordaService,
// we will wrap with 'safe subscriptions' here, namely add -not unsubscribing- subscribers.
FlowSafeSubject(_rawUpdatesPublisher)
}
// we are not inside a flow, we are most likely inside a CordaService;
// we will wrap with 'flow safe subscriptions' here;
// every Observable.subscribe to this object should be wrapped with a FlowSafeSubscriber (-not unsubscribe- subscribers).
return FlowSafeSubject(_rawUpdatesPublisher)
}
override val updates: Observable<Vault.Update<ContractState>>