From 587e57579aa5ed1a4ea3f14a6b2a5bcc346391b5 Mon Sep 17 00:00:00 2001 From: Tommy Lillehagen Date: Sat, 23 Feb 2019 23:51:21 +0000 Subject: [PATCH] CORDA-2669 - Reintroduce pendingFlowsCount (#4806) (#4807) * CORDA-2669 - pendingFlowsCount not in public API Reintroduce `pendingFlowsCount` to public API (as deprecated). Advise to use the `gracefulShutdown` command in the shell instead. * CORDA-2669 - Add pendingFlowsCount to api-current.txt --- .ci/api-current.txt | 2 ++ .../net/corda/core/messaging/CordaRPCOps.kt | 34 ++++++++++++++++++ .../net/corda/nodeapi/internal/RpcHelpers.kt | 36 ------------------- .../corda/node/internal/CordaRPCOpsImpl.kt | 1 - .../net/corda/tools/shell/InteractiveShell.kt | 6 +--- 5 files changed, 37 insertions(+), 42 deletions(-) diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 8505053a97..87186bf097 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -3147,6 +3147,8 @@ public interface net.corda.core.messaging.CordaRPCOps extends net.corda.core.mes public abstract net.corda.core.identity.Party wellKnownPartyFromX500Name(net.corda.core.identity.CordaX500Name) ## public final class net.corda.core.messaging.CordaRPCOpsKt extends java.lang.Object + @NotNull + public static final net.corda.core.messaging.DataFeed> pendingFlowsCount(net.corda.core.messaging.CordaRPCOps) ## @CordaSerializable public final class net.corda.core.messaging.DataFeed extends java.lang.Object diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index 1b924610d7..ecc03e5763 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -22,6 +22,8 @@ import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.Try import rx.Observable +import rx.schedulers.Schedulers +import rx.subjects.PublishSubject import java.io.IOException import java.io.InputStream import java.security.PublicKey @@ -420,6 +422,38 @@ interface CordaRPCOps : RPCOps { fun isWaitingForShutdown(): Boolean } +/** + * Returns a [DataFeed] of the number of pending flows. The [Observable] for the updates will complete the moment all pending flows will have terminated. + */ +@Deprecated("For automated upgrades, consider using the `gracefulShutdown` command in an SSH session instead.") +fun CordaRPCOps.pendingFlowsCount(): DataFeed> { + val updates = PublishSubject.create>() + val initialPendingFlowsCount = stateMachinesFeed().let { + var completedFlowsCount = 0 + var pendingFlowsCount = it.snapshot.size + it.updates.observeOn(Schedulers.io()).subscribe({ update -> + when (update) { + is StateMachineUpdate.Added -> { + pendingFlowsCount++ + updates.onNext(completedFlowsCount to pendingFlowsCount) + } + is StateMachineUpdate.Removed -> { + completedFlowsCount++ + updates.onNext(completedFlowsCount to pendingFlowsCount) + if (completedFlowsCount == pendingFlowsCount) { + updates.onCompleted() + } + } + } + }, updates::onError) + if (pendingFlowsCount == 0) { + updates.onCompleted() + } + pendingFlowsCount + } + return DataFeed(initialPendingFlowsCount, updates) +} + inline fun CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(), paging: PageSpecification = PageSpecification(), sorting: Sort = Sort(emptySet())): Vault.Page { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/RpcHelpers.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/RpcHelpers.kt index e09112af4d..34567596da 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/RpcHelpers.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/RpcHelpers.kt @@ -1,45 +1,9 @@ package net.corda.nodeapi.internal import net.corda.core.messaging.CordaRPCOps -import net.corda.core.messaging.DataFeed -import net.corda.core.messaging.StateMachineUpdate import rx.Observable -import rx.schedulers.Schedulers -import rx.subjects.PublishSubject import java.util.concurrent.TimeUnit -/** - * Returns a [DataFeed] of the number of pending flows. The [Observable] for the updates will complete the moment all pending flows will have terminated. - */ -fun CordaRPCOps.pendingFlowsCount(): DataFeed> { - - val updates = PublishSubject.create>() - val initialPendingFlowsCount = stateMachinesFeed().let { - var completedFlowsCount = 0 - var pendingFlowsCount = it.snapshot.size - it.updates.observeOn(Schedulers.io()).subscribe({ update -> - when (update) { - is StateMachineUpdate.Added -> { - pendingFlowsCount++ - updates.onNext(completedFlowsCount to pendingFlowsCount) - } - is StateMachineUpdate.Removed -> { - completedFlowsCount++ - updates.onNext(completedFlowsCount to pendingFlowsCount) - if (completedFlowsCount == pendingFlowsCount) { - updates.onCompleted() - } - } - } - }, updates::onError) - if (pendingFlowsCount == 0) { - updates.onCompleted() - } - pendingFlowsCount - } - return DataFeed(initialPendingFlowsCount, updates) -} - /** * Returns an [Observable] that will complete when the node will have cancelled the draining shutdown hook. * diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 80909e8fc4..ec15ef7b26 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -35,7 +35,6 @@ import net.corda.node.services.rpc.context import net.corda.node.services.statemachine.StateMachineManager import net.corda.nodeapi.exceptions.NonRpcFlowException import net.corda.nodeapi.exceptions.RejectedCommandException -import net.corda.nodeapi.internal.pendingFlowsCount import rx.Observable import rx.Subscription import java.io.InputStream diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt index 94868f2a68..bff6a27ece 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt @@ -19,11 +19,7 @@ import net.corda.core.flows.FlowLogic import net.corda.core.internal.* import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.openFuture -import net.corda.core.messaging.CordaRPCOps -import net.corda.core.messaging.DataFeed -import net.corda.core.messaging.FlowProgressHandle -import net.corda.core.messaging.StateMachineUpdate -import net.corda.nodeapi.internal.pendingFlowsCount +import net.corda.core.messaging.* import net.corda.tools.shell.utlities.ANSIProgressRenderer import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer import org.crsh.command.InvocationContext