diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/Utils.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/Utils.kt new file mode 100644 index 0000000000..891a57202a --- /dev/null +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/Utils.kt @@ -0,0 +1,19 @@ +package net.corda.client.rpc + +import rx.Observable + +/** + * This function should be invoked on any unwanted Observables returned from RPC to release the server resources. + * + * subscribe({}, {}) was used instead of simply calling subscribe() + * because if an {@code onError} emission arrives (eg. due to an non-correct transaction, such as 'Not sufficient funds') + * then {@link OnErrorNotImplementedException} is thrown. As we won't handle exceptions from unused Observables, + * empty inputs are used to subscribe({}, {}). + */ +fun Observable.notUsed() { + try { + this.subscribe({}, {}).unsubscribe() + } catch (e: Exception) { + // Swallow any other exceptions as well. + } +} diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index 596aad3142..53be1a1485 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -219,8 +219,34 @@ inline fun > CordaRPCOps.startFlow * @param returnValue A [ListenableFuture] of the flow's return value. */ @CordaSerializable -data class FlowHandle( +data class FlowHandle ( val id: StateMachineRunId, val progress: Observable, - val returnValue: ListenableFuture -) + val returnValue: ListenableFuture) : AutoCloseable { + + /** + * Use this function for flows that returnValue and progress are not going to be used or tracked, so as to free up server resources. + * Note that it won't really close if one subscribes on progress [Observable], but then forgets to unsubscribe. + */ + override fun close() { + returnValue.cancel(false) + progress.notUsed() + } +} + +/** + * This function should be invoked on any unwanted Observables returned from RPC to release the server resources. + * TODO: Delete this function when this file is moved to RPC module, as Observable.notUsed() exists there already. + * + * subscribe({}, {}) was used instead of simply calling subscribe() + * because if an {@code onError} emission arrives (eg. due to an non-correct transaction, such as 'Not sufficient funds') + * then {@link OnErrorNotImplementedException} is thrown. As we won't handle exceptions from unused Observables, + * empty inputs are used to subscribe({}, {}). + */ +fun Observable.notUsed() { + try { + this.subscribe({}, {}).unsubscribe() + } catch (e: Exception) { + // Swallow any other exceptions as well; we won't handle exceptions from unused Observables. + } +} diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt index 840f985a86..ebc62f2445 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt @@ -12,6 +12,7 @@ import joptsimple.OptionParser import net.corda.client.jfx.model.Models import net.corda.client.jfx.model.observableValue import net.corda.client.mock.EventGenerator +import net.corda.client.rpc.notUsed import net.corda.contracts.asset.Cash import net.corda.core.contracts.GBP import net.corda.core.contracts.USD @@ -250,14 +251,14 @@ fun main(args: Array) { println("[$i] ISSUING ${command.amount} with ref ${command.issueRef} to ${command.recipient}") val cmd = command.startFlow(issuerRPCGBP) flowHandles["GBPIssuer"]?.add(cmd) - cmd.progress.subscribe({}, {})?.unsubscribe() + cmd.progress.notUsed() Unit }.generate(SplittableRandom()) issuerUSDEventGenerator.bankOfCordaIssueGenerator.map { command -> println("[$i] ISSUING ${command.amount} with ref ${command.issueRef} to ${command.recipient}") val cmd = command.startFlow(issuerRPCUSD) flowHandles["USDIssuer"]?.add(cmd) - cmd.progress.subscribe({}, {})?.unsubscribe() + cmd.progress.notUsed() Unit }.generate(SplittableRandom()) } @@ -268,14 +269,14 @@ fun main(args: Array) { println("[$i] EXITING ${command.amount} with ref ${command.issueRef}") val cmd = command.startFlow(issuerRPCGBP) flowHandles["GBPExit"]?.add(cmd) - cmd.progress.subscribe({}, {})?.unsubscribe() + cmd.progress.notUsed() Unit }.generate(SplittableRandom()) issuerUSDEventGenerator.bankOfCordaExitGenerator.map { command -> println("[$i] EXITING ${command.amount} with ref ${command.issueRef}") val cmd = command.startFlow(issuerRPCUSD) flowHandles["USDExit"]?.add(cmd) - cmd.progress.subscribe({}, {})?.unsubscribe() + cmd.progress.notUsed() Unit }.generate(SplittableRandom()) } @@ -287,7 +288,7 @@ fun main(args: Array) { println("[$i] SENDING ${command.amount} from ${aliceRPC.nodeIdentity().legalIdentity} to ${command.recipient}") val cmd = command.startFlow(aliceRPC) flowHandles["Alice"]?.add(cmd) - cmd.progress.subscribe({}, {})?.unsubscribe() + cmd.progress.notUsed() Unit }.generate(SplittableRandom()) @@ -296,7 +297,7 @@ fun main(args: Array) { println("[$i] SENDING ${command.amount} from ${bobRPC.nodeIdentity().legalIdentity} to ${command.recipient}") val cmd = command.startFlow(bobRPC) flowHandles["Bob"]?.add(cmd) - cmd.progress.subscribe({}, {})?.unsubscribe() + cmd.progress.notUsed() Unit }.generate(SplittableRandom()) }