diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index a3b717b76e..ee2f5052a6 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -166,6 +166,7 @@ class RPCClientProxyHandler( val onObservableRemove = RemovalListener>> { key, _, cause -> val observableId = key!! val rpcCallSite: CallSite? = callSiteMap?.remove(observableId) + if (cause == RemovalCause.COLLECTED) { log.warn(listOf( "A hot observable returned from an RPC was never subscribed to.", diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index b02d0a6c76..d1f2f7e7b2 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -150,6 +150,7 @@ internal class CordaRPCOpsImpl( override fun killFlow(id: StateMachineRunId): Boolean = if (smm.killFlow(id)) true else smm.flowHospital.dropSessionInit(id.uuid) override fun stateMachinesFeed(): DataFeed, StateMachineUpdate> { + val (allStateMachines, changes) = smm.track() return DataFeed( allStateMachines.map { stateMachineInfoFromFlowLogic(it) }, diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt index ac5edb6e7e..a78edf970b 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt @@ -13,6 +13,7 @@ import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.GracefulReconnect import net.corda.client.rpc.PermissionException +import net.corda.client.rpc.notUsed import net.corda.core.CordaException import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.UniqueIdentifier @@ -70,6 +71,7 @@ import kotlin.concurrent.thread // TODO: Resurrect or reimplement the mail plugin. // TODO: Make it notice new shell commands added after the node started. +@Suppress("MaxLineLength") object InteractiveShell { private val log = LoggerFactory.getLogger(javaClass) private lateinit var rpcOps: (username: String, password: String) -> InternalCordaRPCOps @@ -521,8 +523,11 @@ object InteractiveShell { val parser = StringToMethodCallParser(CordaRPCOps::class.java, inputObjectMapper) val call = parser.parse(cordaRPCOps, cmd) result = call.call() + var subscription : Subscriber<*>? = null if (result != null && result !== kotlin.Unit && result !is Void) { - result = printAndFollowRPCResponse(result, out, outputFormat) + val (subs, future) = printAndFollowRPCResponse(result, out, outputFormat) + subscription = subs + result = future } if (result is Future<*>) { if (!result.isDone) { @@ -532,6 +537,7 @@ object InteractiveShell { try { result = result.get() } catch (e: InterruptedException) { + subscription?.unsubscribe() Thread.currentThread().interrupt() } catch (e: ExecutionException) { throw e.rootCause @@ -621,7 +627,11 @@ object InteractiveShell { } } - private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter, outputFormat: OutputFormat): CordaFuture { + private fun printAndFollowRPCResponse( + response: Any?, + out: PrintWriter, + outputFormat: OutputFormat + ): Pair> { val outputMapper = createOutputMapper(outputFormat) val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) } @@ -659,34 +669,52 @@ object InteractiveShell { } } - private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture { + private fun maybeFollow( + response: Any?, + printerFun: (Any?) -> String, + out: PrintWriter + ): Pair> { // Match on a couple of common patterns for "important" observables. It's tough to do this in a generic // way because observables can be embedded anywhere in the object graph, and can emit other arbitrary // object graphs that contain yet more observables. So we just look for top level responses that follow // the standard "track" pattern, and print them until the user presses Ctrl-C - if (response == null) return doneFuture(Unit) + var result = Pair>(null, doneFuture(Unit)) - if (response is DataFeed<*, *>) { - out.println("Snapshot:") - out.println(printerFun(response.snapshot)) - out.flush() - out.println("Updates:") - return printNextElements(response.updates, printerFun, out) + + when { + response is DataFeed<*, *> -> { + out.println("Snapshot:") + out.println(printerFun(response.snapshot)) + out.flush() + out.println("Updates:") + + val unsubscribeAndPrint: (Any?) -> String = { resp -> + if (resp is StateMachineUpdate.Added) { + resp.stateMachineInfo.progressTrackerStepAndUpdates?.updates?.notUsed() + } + printerFun(resp) + } + + result = printNextElements(response.updates, unsubscribeAndPrint, out) + } + response is Observable<*> -> { + result = printNextElements(response, printerFun, out) + } + response != null -> { + out.println(printerFun(response)) + } } - if (response is Observable<*>) { - - return printNextElements(response, printerFun, out) - } - - out.println(printerFun(response)) - return doneFuture(Unit) + return result } - private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture { - + private fun printNextElements( + elements: Observable<*>, + printerFun: (Any?) -> String, + out: PrintWriter + ): Pair> { val subscriber = PrintingSubscriber(printerFun, out) uncheckedCast(elements).subscribe(subscriber) - return subscriber.future + return Pair(subscriber, subscriber.future) } }