CORDA-2669 - Reintroduce pendingFlowsCount (#4806)

* CORDA-2669 - pendingFlowsCount not in public API

Reintroduce `pendingFlowsCount` to public API (as deprecated). Advise
to use the `gracefulShutdown` command in the shell instead.

* CORDA-2669 - Add pendingFlowsCount to api-current.txt
This commit is contained in:
Tommy Lillehagen
2019-02-23 17:10:32 +00:00
committed by GitHub
parent 9d04eccc8a
commit 8fb3d4dc01
5 changed files with 37 additions and 42 deletions

View File

@ -22,6 +22,8 @@ import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.Try
import rx.Observable
import rx.schedulers.Schedulers
import rx.subjects.PublishSubject
import java.io.IOException
import java.io.InputStream
import java.security.PublicKey
@ -420,6 +422,38 @@ interface CordaRPCOps : RPCOps {
fun isWaitingForShutdown(): Boolean
}
/**
* Returns a [DataFeed] of the number of pending flows. The [Observable] for the updates will complete the moment all pending flows will have terminated.
*/
@Deprecated("For automated upgrades, consider using the `gracefulShutdown` command in an SSH session instead.")
fun CordaRPCOps.pendingFlowsCount(): DataFeed<Int, Pair<Int, Int>> {
val updates = PublishSubject.create<Pair<Int, Int>>()
val initialPendingFlowsCount = stateMachinesFeed().let {
var completedFlowsCount = 0
var pendingFlowsCount = it.snapshot.size
it.updates.observeOn(Schedulers.io()).subscribe({ update ->
when (update) {
is StateMachineUpdate.Added -> {
pendingFlowsCount++
updates.onNext(completedFlowsCount to pendingFlowsCount)
}
is StateMachineUpdate.Removed -> {
completedFlowsCount++
updates.onNext(completedFlowsCount to pendingFlowsCount)
if (completedFlowsCount == pendingFlowsCount) {
updates.onCompleted()
}
}
}
}, updates::onError)
if (pendingFlowsCount == 0) {
updates.onCompleted()
}
pendingFlowsCount
}
return DataFeed(initialPendingFlowsCount, updates)
}
inline fun <reified T : ContractState> CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(),
paging: PageSpecification = PageSpecification(),
sorting: Sort = Sort(emptySet())): Vault.Page<T> {