Observable.subscribe().unsubscribe() and ListenableFuture.cancel() (#397)

Observable.subscribe().unsubscribe() dance to free up the MQ resources server-side.

* remove an unused import

* implement a FlowHandle<Transaction>.finalize method

* Rename finalize() to discard() - remove the collection and run discard individually

* Remove unused imports

* Observable.notUsed helper function

* Tweaks to comments

* FlowHandle implements AutoClosable

* Resolving conflicts and move notUsed to RPC module

* Copy Observable.notUsed in core module.

* delete discard method
This commit is contained in:
Konstantinos Chalkias 2017-03-31 10:08:12 +01:00 committed by GitHub
parent b62f901892
commit d72b75caa4
3 changed files with 55 additions and 9 deletions

View File

@ -0,0 +1,19 @@
package net.corda.client.rpc
import rx.Observable
/**
* This function should be invoked on any unwanted Observables returned from RPC to release the server resources.
*
* subscribe({}, {}) was used instead of simply calling subscribe()
* because if an {@code onError} emission arrives (eg. due to an non-correct transaction, such as 'Not sufficient funds')
* then {@link OnErrorNotImplementedException} is thrown. As we won't handle exceptions from unused Observables,
* empty inputs are used to subscribe({}, {}).
*/
fun <T> Observable<T>.notUsed() {
try {
this.subscribe({}, {}).unsubscribe()
} catch (e: Exception) {
// Swallow any other exceptions as well.
}
}

View File

@ -219,8 +219,34 @@ inline fun <T : Any, A, B, C, D, reified R : FlowLogic<T>> CordaRPCOps.startFlow
* @param returnValue A [ListenableFuture] of the flow's return value.
*/
@CordaSerializable
data class FlowHandle<A>(
data class FlowHandle<A> (
val id: StateMachineRunId,
val progress: Observable<String>,
val returnValue: ListenableFuture<A>
)
val returnValue: ListenableFuture<A>) : AutoCloseable {
/**
* Use this function for flows that returnValue and progress are not going to be used or tracked, so as to free up server resources.
* Note that it won't really close if one subscribes on progress [Observable], but then forgets to unsubscribe.
*/
override fun close() {
returnValue.cancel(false)
progress.notUsed()
}
}
/**
* This function should be invoked on any unwanted Observables returned from RPC to release the server resources.
* TODO: Delete this function when this file is moved to RPC module, as Observable<T>.notUsed() exists there already.
*
* subscribe({}, {}) was used instead of simply calling subscribe()
* because if an {@code onError} emission arrives (eg. due to an non-correct transaction, such as 'Not sufficient funds')
* then {@link OnErrorNotImplementedException} is thrown. As we won't handle exceptions from unused Observables,
* empty inputs are used to subscribe({}, {}).
*/
fun <T> Observable<T>.notUsed() {
try {
this.subscribe({}, {}).unsubscribe()
} catch (e: Exception) {
// Swallow any other exceptions as well; we won't handle exceptions from unused Observables.
}
}

View File

@ -12,6 +12,7 @@ import joptsimple.OptionParser
import net.corda.client.jfx.model.Models
import net.corda.client.jfx.model.observableValue
import net.corda.client.mock.EventGenerator
import net.corda.client.rpc.notUsed
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.GBP
import net.corda.core.contracts.USD
@ -250,14 +251,14 @@ fun main(args: Array<String>) {
println("[$i] ISSUING ${command.amount} with ref ${command.issueRef} to ${command.recipient}")
val cmd = command.startFlow(issuerRPCGBP)
flowHandles["GBPIssuer"]?.add(cmd)
cmd.progress.subscribe({}, {})?.unsubscribe()
cmd.progress.notUsed()
Unit
}.generate(SplittableRandom())
issuerUSDEventGenerator.bankOfCordaIssueGenerator.map { command ->
println("[$i] ISSUING ${command.amount} with ref ${command.issueRef} to ${command.recipient}")
val cmd = command.startFlow(issuerRPCUSD)
flowHandles["USDIssuer"]?.add(cmd)
cmd.progress.subscribe({}, {})?.unsubscribe()
cmd.progress.notUsed()
Unit
}.generate(SplittableRandom())
}
@ -268,14 +269,14 @@ fun main(args: Array<String>) {
println("[$i] EXITING ${command.amount} with ref ${command.issueRef}")
val cmd = command.startFlow(issuerRPCGBP)
flowHandles["GBPExit"]?.add(cmd)
cmd.progress.subscribe({}, {})?.unsubscribe()
cmd.progress.notUsed()
Unit
}.generate(SplittableRandom())
issuerUSDEventGenerator.bankOfCordaExitGenerator.map { command ->
println("[$i] EXITING ${command.amount} with ref ${command.issueRef}")
val cmd = command.startFlow(issuerRPCUSD)
flowHandles["USDExit"]?.add(cmd)
cmd.progress.subscribe({}, {})?.unsubscribe()
cmd.progress.notUsed()
Unit
}.generate(SplittableRandom())
}
@ -287,7 +288,7 @@ fun main(args: Array<String>) {
println("[$i] SENDING ${command.amount} from ${aliceRPC.nodeIdentity().legalIdentity} to ${command.recipient}")
val cmd = command.startFlow(aliceRPC)
flowHandles["Alice"]?.add(cmd)
cmd.progress.subscribe({}, {})?.unsubscribe()
cmd.progress.notUsed()
Unit
}.generate(SplittableRandom())
@ -296,7 +297,7 @@ fun main(args: Array<String>) {
println("[$i] SENDING ${command.amount} from ${bobRPC.nodeIdentity().legalIdentity} to ${command.recipient}")
val cmd = command.startFlow(bobRPC)
flowHandles["Bob"]?.add(cmd)
cmd.progress.subscribe({}, {})?.unsubscribe()
cmd.progress.notUsed()
Unit
}.generate(SplittableRandom())
}