CORDA-3236 fix observables not being tagged with notUsed() (#5557)

* Fixed not utilising the observables being returned by stateMachines added response with a notUsed(). Opening a ticket for implementation investigation.

* stateMachinesFeed will unsubscribe on interrupt rather than remaining infinitely subscribed.

* Fixed reported detekt issues on the InteractiveShell.

* Changes according to PR review.
This commit is contained in:
Stefan Iliev
2019-10-10 14:24:32 +01:00
committed by bpaunescu
parent 22a8108099
commit 48fd78d059
3 changed files with 50 additions and 20 deletions

View File

@ -166,6 +166,7 @@ class RPCClientProxyHandler(
val onObservableRemove = RemovalListener<InvocationId, UnicastSubject<Notification<*>>> { key, _, cause -> val onObservableRemove = RemovalListener<InvocationId, UnicastSubject<Notification<*>>> { key, _, cause ->
val observableId = key!! val observableId = key!!
val rpcCallSite: CallSite? = callSiteMap?.remove(observableId) val rpcCallSite: CallSite? = callSiteMap?.remove(observableId)
if (cause == RemovalCause.COLLECTED) { if (cause == RemovalCause.COLLECTED) {
log.warn(listOf( log.warn(listOf(
"A hot observable returned from an RPC was never subscribed to.", "A hot observable returned from an RPC was never subscribed to.",

View File

@ -150,6 +150,7 @@ internal class CordaRPCOpsImpl(
override fun killFlow(id: StateMachineRunId): Boolean = if (smm.killFlow(id)) true else smm.flowHospital.dropSessionInit(id.uuid) override fun killFlow(id: StateMachineRunId): Boolean = if (smm.killFlow(id)) true else smm.flowHospital.dropSessionInit(id.uuid)
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> { override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
val (allStateMachines, changes) = smm.track() val (allStateMachines, changes) = smm.track()
return DataFeed( return DataFeed(
allStateMachines.map { stateMachineInfoFromFlowLogic(it) }, allStateMachines.map { stateMachineInfoFromFlowLogic(it) },

View File

@ -13,6 +13,7 @@ import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.GracefulReconnect import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.PermissionException import net.corda.client.rpc.PermissionException
import net.corda.client.rpc.notUsed
import net.corda.core.CordaException import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier import net.corda.core.contracts.UniqueIdentifier
@ -70,6 +71,7 @@ import kotlin.concurrent.thread
// TODO: Resurrect or reimplement the mail plugin. // TODO: Resurrect or reimplement the mail plugin.
// TODO: Make it notice new shell commands added after the node started. // TODO: Make it notice new shell commands added after the node started.
@Suppress("MaxLineLength")
object InteractiveShell { object InteractiveShell {
private val log = LoggerFactory.getLogger(javaClass) private val log = LoggerFactory.getLogger(javaClass)
private lateinit var rpcOps: (username: String, password: String) -> InternalCordaRPCOps private lateinit var rpcOps: (username: String, password: String) -> InternalCordaRPCOps
@ -521,8 +523,11 @@ object InteractiveShell {
val parser = StringToMethodCallParser(CordaRPCOps::class.java, inputObjectMapper) val parser = StringToMethodCallParser(CordaRPCOps::class.java, inputObjectMapper)
val call = parser.parse(cordaRPCOps, cmd) val call = parser.parse(cordaRPCOps, cmd)
result = call.call() result = call.call()
var subscription : Subscriber<*>? = null
if (result != null && result !== kotlin.Unit && result !is Void) { if (result != null && result !== kotlin.Unit && result !is Void) {
result = printAndFollowRPCResponse(result, out, outputFormat) val (subs, future) = printAndFollowRPCResponse(result, out, outputFormat)
subscription = subs
result = future
} }
if (result is Future<*>) { if (result is Future<*>) {
if (!result.isDone) { if (!result.isDone) {
@ -532,6 +537,7 @@ object InteractiveShell {
try { try {
result = result.get() result = result.get()
} catch (e: InterruptedException) { } catch (e: InterruptedException) {
subscription?.unsubscribe()
Thread.currentThread().interrupt() Thread.currentThread().interrupt()
} catch (e: ExecutionException) { } catch (e: ExecutionException) {
throw e.rootCause throw e.rootCause
@ -621,7 +627,11 @@ object InteractiveShell {
} }
} }
private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter, outputFormat: OutputFormat): CordaFuture<Unit> { private fun printAndFollowRPCResponse(
response: Any?,
out: PrintWriter,
outputFormat: OutputFormat
): Pair<PrintingSubscriber?, CordaFuture<Unit>> {
val outputMapper = createOutputMapper(outputFormat) val outputMapper = createOutputMapper(outputFormat)
val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) } val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) }
@ -659,34 +669,52 @@ object InteractiveShell {
} }
} }
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> { private fun maybeFollow(
response: Any?,
printerFun: (Any?) -> String,
out: PrintWriter
): Pair<PrintingSubscriber?, CordaFuture<Unit>> {
// Match on a couple of common patterns for "important" observables. It's tough to do this in a generic // Match on a couple of common patterns for "important" observables. It's tough to do this in a generic
// way because observables can be embedded anywhere in the object graph, and can emit other arbitrary // way because observables can be embedded anywhere in the object graph, and can emit other arbitrary
// object graphs that contain yet more observables. So we just look for top level responses that follow // object graphs that contain yet more observables. So we just look for top level responses that follow
// the standard "track" pattern, and print them until the user presses Ctrl-C // the standard "track" pattern, and print them until the user presses Ctrl-C
if (response == null) return doneFuture(Unit) var result = Pair<PrintingSubscriber?, CordaFuture<Unit>>(null, doneFuture(Unit))
if (response is DataFeed<*, *>) {
out.println("Snapshot:") when {
out.println(printerFun(response.snapshot)) response is DataFeed<*, *> -> {
out.flush() out.println("Snapshot:")
out.println("Updates:") out.println(printerFun(response.snapshot))
return printNextElements(response.updates, printerFun, out) out.flush()
out.println("Updates:")
val unsubscribeAndPrint: (Any?) -> String = { resp ->
if (resp is StateMachineUpdate.Added) {
resp.stateMachineInfo.progressTrackerStepAndUpdates?.updates?.notUsed()
}
printerFun(resp)
}
result = printNextElements(response.updates, unsubscribeAndPrint, out)
}
response is Observable<*> -> {
result = printNextElements(response, printerFun, out)
}
response != null -> {
out.println(printerFun(response))
}
} }
if (response is Observable<*>) { return result
return printNextElements(response, printerFun, out)
}
out.println(printerFun(response))
return doneFuture(Unit)
} }
private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> { private fun printNextElements(
elements: Observable<*>,
printerFun: (Any?) -> String,
out: PrintWriter
): Pair<PrintingSubscriber?, CordaFuture<Unit>> {
val subscriber = PrintingSubscriber(printerFun, out) val subscriber = PrintingSubscriber(printerFun, out)
uncheckedCast(elements).subscribe(subscriber) uncheckedCast(elements).subscribe(subscriber)
return subscriber.future return Pair(subscriber, subscriber.future)
} }
} }