CORDA-2669 - Reintroduce pendingFlowsCount (#4806) (#4807)

* CORDA-2669 - pendingFlowsCount not in public API

Reintroduce `pendingFlowsCount` to public API (as deprecated). Advise
to use the `gracefulShutdown` command in the shell instead.

* CORDA-2669 - Add pendingFlowsCount to api-current.txt
This commit is contained in:
Tommy Lillehagen 2019-02-23 23:51:21 +00:00 committed by Katelyn Baker
parent efe8a25138
commit 587e57579a
5 changed files with 37 additions and 42 deletions

View File

@ -3147,6 +3147,8 @@ public interface net.corda.core.messaging.CordaRPCOps extends net.corda.core.mes
public abstract net.corda.core.identity.Party wellKnownPartyFromX500Name(net.corda.core.identity.CordaX500Name) public abstract net.corda.core.identity.Party wellKnownPartyFromX500Name(net.corda.core.identity.CordaX500Name)
## ##
public final class net.corda.core.messaging.CordaRPCOpsKt extends java.lang.Object public final class net.corda.core.messaging.CordaRPCOpsKt extends java.lang.Object
@NotNull
public static final net.corda.core.messaging.DataFeed<Integer, kotlin.Pair<Integer, Integer>> pendingFlowsCount(net.corda.core.messaging.CordaRPCOps)
## ##
@CordaSerializable @CordaSerializable
public final class net.corda.core.messaging.DataFeed extends java.lang.Object public final class net.corda.core.messaging.DataFeed extends java.lang.Object

View File

@ -22,6 +22,8 @@ import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.Try import net.corda.core.utilities.Try
import rx.Observable import rx.Observable
import rx.schedulers.Schedulers
import rx.subjects.PublishSubject
import java.io.IOException import java.io.IOException
import java.io.InputStream import java.io.InputStream
import java.security.PublicKey import java.security.PublicKey
@ -420,6 +422,38 @@ interface CordaRPCOps : RPCOps {
fun isWaitingForShutdown(): Boolean fun isWaitingForShutdown(): Boolean
} }
/**
* Returns a [DataFeed] of the number of pending flows. The [Observable] for the updates will complete the moment all pending flows will have terminated.
*/
@Deprecated("For automated upgrades, consider using the `gracefulShutdown` command in an SSH session instead.")
fun CordaRPCOps.pendingFlowsCount(): DataFeed<Int, Pair<Int, Int>> {
val updates = PublishSubject.create<Pair<Int, Int>>()
val initialPendingFlowsCount = stateMachinesFeed().let {
var completedFlowsCount = 0
var pendingFlowsCount = it.snapshot.size
it.updates.observeOn(Schedulers.io()).subscribe({ update ->
when (update) {
is StateMachineUpdate.Added -> {
pendingFlowsCount++
updates.onNext(completedFlowsCount to pendingFlowsCount)
}
is StateMachineUpdate.Removed -> {
completedFlowsCount++
updates.onNext(completedFlowsCount to pendingFlowsCount)
if (completedFlowsCount == pendingFlowsCount) {
updates.onCompleted()
}
}
}
}, updates::onError)
if (pendingFlowsCount == 0) {
updates.onCompleted()
}
pendingFlowsCount
}
return DataFeed(initialPendingFlowsCount, updates)
}
inline fun <reified T : ContractState> CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(), inline fun <reified T : ContractState> CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(),
paging: PageSpecification = PageSpecification(), paging: PageSpecification = PageSpecification(),
sorting: Sort = Sort(emptySet())): Vault.Page<T> { sorting: Sort = Sort(emptySet())): Vault.Page<T> {

View File

@ -1,45 +1,9 @@
package net.corda.nodeapi.internal package net.corda.nodeapi.internal
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineUpdate
import rx.Observable import rx.Observable
import rx.schedulers.Schedulers
import rx.subjects.PublishSubject
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
/**
* Returns a [DataFeed] of the number of pending flows. The [Observable] for the updates will complete the moment all pending flows will have terminated.
*/
fun CordaRPCOps.pendingFlowsCount(): DataFeed<Int, Pair<Int, Int>> {
val updates = PublishSubject.create<Pair<Int, Int>>()
val initialPendingFlowsCount = stateMachinesFeed().let {
var completedFlowsCount = 0
var pendingFlowsCount = it.snapshot.size
it.updates.observeOn(Schedulers.io()).subscribe({ update ->
when (update) {
is StateMachineUpdate.Added -> {
pendingFlowsCount++
updates.onNext(completedFlowsCount to pendingFlowsCount)
}
is StateMachineUpdate.Removed -> {
completedFlowsCount++
updates.onNext(completedFlowsCount to pendingFlowsCount)
if (completedFlowsCount == pendingFlowsCount) {
updates.onCompleted()
}
}
}
}, updates::onError)
if (pendingFlowsCount == 0) {
updates.onCompleted()
}
pendingFlowsCount
}
return DataFeed(initialPendingFlowsCount, updates)
}
/** /**
* Returns an [Observable] that will complete when the node will have cancelled the draining shutdown hook. * Returns an [Observable] that will complete when the node will have cancelled the draining shutdown hook.
* *

View File

@ -35,7 +35,6 @@ import net.corda.node.services.rpc.context
import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.statemachine.StateMachineManager
import net.corda.nodeapi.exceptions.NonRpcFlowException import net.corda.nodeapi.exceptions.NonRpcFlowException
import net.corda.nodeapi.exceptions.RejectedCommandException import net.corda.nodeapi.exceptions.RejectedCommandException
import net.corda.nodeapi.internal.pendingFlowsCount
import rx.Observable import rx.Observable
import rx.Subscription import rx.Subscription
import java.io.InputStream import java.io.InputStream

View File

@ -19,11 +19,7 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.internal.* import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.*
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.StateMachineUpdate
import net.corda.nodeapi.internal.pendingFlowsCount
import net.corda.tools.shell.utlities.ANSIProgressRenderer import net.corda.tools.shell.utlities.ANSIProgressRenderer
import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer
import org.crsh.command.InvocationContext import org.crsh.command.InvocationContext