CORDA-299: Remove progress Observable from FlowHandle, unless explicitly requested. (#513)

* Remove progress Observable from FlowHandle, unless explicitly requested.
* Refactor FlowHandle creation into FlowStateMachine.
* Prevent server-side queue subscription for dummy Observable.
* Refactor so that RPC client does not receive any unused progress Observables. This is the simplest way of ensuring we have no dangling "hot" Observables when the RPC client closes.
* Test flow has correct handle.
* Resolve some compiler warnings.
* Document how starting a flow does not involve progress tracking by default.
* Update changelog and release notes for RPC API.
* Rename new RPC API to startTrackedFlow().
* Remove optimisation because of its affect on the client-side.
* Update documentation.
This commit is contained in:
Chris Rankin 2017-04-19 20:11:51 +01:00 committed by GitHub
parent 7542d355a9
commit d2d7cbc9ec
12 changed files with 169 additions and 75 deletions

View File

@ -3,7 +3,10 @@ package net.corda.client.rpc
import net.corda.core.contracts.DOLLARS
import net.corda.core.flows.FlowException
import net.corda.core.getOrThrow
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.random63BitValue
import net.corda.core.serialization.OpaqueBytes
@ -21,8 +24,7 @@ import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import kotlin.test.*
class CordaRPCClientTest : NodeBasedTest() {
private val rpcUser = User("user1", "test", permissions = setOf(
@ -69,7 +71,7 @@ class CordaRPCClientTest : NodeBasedTest() {
println("Creating proxy")
val proxy = client.proxy()
println("Starting flow")
val flowHandle = proxy.startFlow(
val flowHandle = proxy.startTrackedFlow(
::CashIssueFlow,
20.DOLLARS, OpaqueBytes.of(0), node.info.legalIdentity, node.info.legalIdentity)
println("Started flow, waiting on result")
@ -90,6 +92,16 @@ class CordaRPCClientTest : NodeBasedTest() {
}
}
@Test
fun `check basic flow has no progress`() {
client.start(rpcUser.username, rpcUser.password)
val proxy = client.proxy()
proxy.startFlow(::CashPaymentFlow, 100.DOLLARS, node.info.legalIdentity).use {
assertFalse(it is FlowProgressHandle<*>)
assertTrue(it is FlowHandle<*>)
}
}
@Test
fun `get cash balances`() {
println("Starting client")
@ -105,9 +117,7 @@ class CordaRPCClientTest : NodeBasedTest() {
node.info.legalIdentity, node.info.legalIdentity
)
println("Started issuing cash, waiting on result")
flowHandle.progress.subscribe {
println("CashIssue PROGRESS $it")
}
flowHandle.returnValue.get()
val finishCash = proxy.getCashBalances()
println("Cash Balances: $finishCash")

View File

@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.messaging.FlowHandle
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction
@ -41,6 +42,8 @@ interface FlowStateMachine<R> {
@Suspendable
fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction
fun createHandle(hasProgress: Boolean): FlowHandle<R>
val serviceHub: ServiceHub
val logger: Logger
val id: StateMachineRunId

View File

@ -44,7 +44,7 @@ sealed class StateMachineUpdate {
/**
* RPC operations that the node exposes to clients using the Java client library. These can be called from
* client apps and are implemented by the node in the [CordaRPCOpsImpl] class.
* client apps and are implemented by the node in the [net.corda.node.internal.CordaRPCOpsImpl] class.
*/
// TODO: The use of Pairs throughout is unfriendly for Java interop.
@ -81,12 +81,18 @@ interface CordaRPCOps : RPCOps {
@RPCReturnsObservables
fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>>
/**
* Start the given flow with the given arguments.
*/
@RPCReturnsObservables
fun <T : Any> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T>
/**
* Start the given flow with the given arguments, returning an [Observable] with a single observation of the
* result of running the flow.
*/
@RPCReturnsObservables
fun <T : Any> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T>
fun <T : Any> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T>
/**
* Returns Node's identity, assuming this will not change while the node is running.
@ -187,20 +193,20 @@ interface CordaRPCOps : RPCOps {
inline fun <T : Any, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
flowConstructor: () -> R
) = startFlowDynamic(R::class.java)
): FlowHandle<T> = startFlowDynamic(R::class.java)
inline fun <T : Any, A, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
flowConstructor: (A) -> R,
arg0: A
) = startFlowDynamic(R::class.java, arg0)
): FlowHandle<T> = startFlowDynamic(R::class.java, arg0)
inline fun <T : Any, A, B, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
flowConstructor: (A, B) -> R,
arg0: A,
arg1: B
) = startFlowDynamic(R::class.java, arg0, arg1)
): FlowHandle<T> = startFlowDynamic(R::class.java, arg0, arg1)
inline fun <T : Any, A, B, C, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
@ -208,7 +214,7 @@ inline fun <T : Any, A, B, C, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
arg0: A,
arg1: B,
arg2: C
) = startFlowDynamic(R::class.java, arg0, arg1, arg2)
): FlowHandle<T> = startFlowDynamic(R::class.java, arg0, arg1, arg2)
inline fun <T : Any, A, B, C, D, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
@ -217,44 +223,47 @@ inline fun <T : Any, A, B, C, D, reified R : FlowLogic<T>> CordaRPCOps.startFlow
arg1: B,
arg2: C,
arg3: D
) = startFlowDynamic(R::class.java, arg0, arg1, arg2, arg3)
): FlowHandle<T> = startFlowDynamic(R::class.java, arg0, arg1, arg2, arg3)
/**
* [FlowHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value.
*
* @param id The started state machine's ID.
* @param progress The stream of progress tracker events.
* @param returnValue A [ListenableFuture] of the flow's return value.
* Same again, except this time with progress-tracking enabled.
*/
@CordaSerializable
data class FlowHandle<A>(
val id: StateMachineRunId,
val progress: Observable<String>,
val returnValue: ListenableFuture<A>) : AutoCloseable {
@Suppress("unused")
inline fun <T : Any, reified R : FlowLogic<T>> CordaRPCOps.startTrackedFlow(
@Suppress("unused_parameter")
flowConstructor: () -> R
): FlowProgressHandle<T> = startTrackedFlowDynamic(R::class.java)
/**
* Use this function for flows that returnValue and progress are not going to be used or tracked, so as to free up server resources.
* Note that it won't really close if one subscribes on progress [Observable], but then forgets to unsubscribe.
*/
override fun close() {
returnValue.cancel(false)
progress.notUsed()
}
}
@Suppress("unused")
inline fun <T : Any, A, reified R : FlowLogic<T>> CordaRPCOps.startTrackedFlow(
@Suppress("unused_parameter")
flowConstructor: (A) -> R,
arg0: A
): FlowProgressHandle<T> = startTrackedFlowDynamic(R::class.java, arg0)
/**
* This function should be invoked on any unwanted Observables returned from RPC to release the server resources.
* TODO: Delete this function when this file is moved to RPC module, as Observable<T>.notUsed() exists there already.
*
* subscribe({}, {}) was used instead of simply calling subscribe()
* because if an {@code onError} emission arrives (eg. due to an non-correct transaction, such as 'Not sufficient funds')
* then {@link OnErrorNotImplementedException} is thrown. As we won't handle exceptions from unused Observables,
* empty inputs are used to subscribe({}, {}).
*/
fun <T> Observable<T>.notUsed() {
try {
this.subscribe({}, {}).unsubscribe()
} catch (e: Exception) {
// Swallow any other exceptions as well; we won't handle exceptions from unused Observables.
}
}
@Suppress("unused")
inline fun <T : Any, A, B, reified R : FlowLogic<T>> CordaRPCOps.startTrackedFlow(
@Suppress("unused_parameter")
flowConstructor: (A, B) -> R,
arg0: A,
arg1: B
): FlowProgressHandle<T> = startTrackedFlowDynamic(R::class.java, arg0, arg1)
@Suppress("unused")
inline fun <T : Any, A, B, C, reified R : FlowLogic<T>> CordaRPCOps.startTrackedFlow(
@Suppress("unused_parameter")
flowConstructor: (A, B, C) -> R,
arg0: A,
arg1: B,
arg2: C
): FlowProgressHandle<T> = startTrackedFlowDynamic(R::class.java, arg0, arg1, arg2)
@Suppress("unused")
inline fun <T : Any, A, B, C, D, reified R : FlowLogic<T>> CordaRPCOps.startTrackedFlow(
@Suppress("unused_parameter")
flowConstructor: (A, B, C, D) -> R,
arg0: A,
arg1: B,
arg2: C,
arg3: D
): FlowProgressHandle<T> = startTrackedFlowDynamic(R::class.java, arg0, arg1, arg2, arg3)

View File

@ -0,0 +1,25 @@
package net.corda.core.messaging
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.flows.StateMachineRunId
import rx.Observable
/**
* [FlowHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value.
*
* @property id The started state machine's ID.
* @property returnValue A [ListenableFuture] of the flow's return value.
*/
interface FlowHandle<A> : AutoCloseable {
val id: StateMachineRunId
val returnValue: ListenableFuture<A>
}
/**
* [FlowProgressHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value.
*
* @property progress The stream of progress tracker events.
*/
interface FlowProgressHandle<A> : FlowHandle<A> {
val progress: Observable<String>
}

View File

@ -14,6 +14,8 @@ UNRELEASED
* DemoBench is now installed as ``Corda DemoBench`` instead of ``DemoBench``.
* Starting a flow no longer enables progress tracking by default. To enable it, you must now invoke your flow using one of the new ``CordaRPCOps.startTrackedFlow`` functions. ``FlowHandle`` is now an interface, and its ``progress: Observable`` field has been moved to the ``FlowProgressHandle`` child interface. Hence developers no longer need to invoke ``notUsed`` on their flows' unwanted progress-tracking observables.
Milestone 10.0
--------------

View File

@ -215,12 +215,14 @@ Java reflection ``Class`` object that describes the flow class to use (in this c
It also takes a set of arguments to pass to the constructor. Because it's possible for flow invocations to
be requested by untrusted code (e.g. a state that you have been sent), the types that can be passed into the
flow are checked against a whitelist, which can be extended by apps themselves at load time. There are also a series
of inlined extension functions of the form ``CordaRPCOps.startFlow`` which help with invoking flows in a type
of inlined Kotlin extension functions of the form ``CordaRPCOps.startFlow`` which help with invoking flows in a type
safe manner.
The process of starting a flow returns a ``FlowHandle`` that you can use to either observe
the result, observe its progress and which also contains a permanent identifier for the invoked flow in the form
of the ``StateMachineRunId``.
The process of starting a flow returns a ``FlowHandle`` that you can use to observe
the result, and which also contains a permanent identifier for the invoked flow in the form
of the ``StateMachineRunId``. Should you also wish to track the progress of your flow (see :ref:`progress-tracking`) then you can invoke your flow instead using ``CordaRPCOps.startTrackedFlowDynamic`` or any of its corresponding ``CordaRPCOps.startTrackedFlow`` extension functions. These will return a ``FlowProgressHandle``, which is just like a ``FlowHandle`` except that it also contains an observable ``progress`` field.
.. note:: The developer `must` then either subscribe to this ``progress`` observable or invoke the ``notUsed()`` extension function for it. Otherwise the unused observable will waste resources back in the node.
In a two party flow only one side is to be manually started using ``CordaRPCOps.startFlow``. The other side
has to be registered by its node to respond to the initiating flow via ``PluginServiceHub.registerFlowInitiator``.
@ -413,6 +415,8 @@ This code is longer but no more complicated. Here are some things to pay attenti
As you can see, the flow logic is straightforward and does not contain any callbacks or network glue code, despite
the fact that it takes minimal resources and can survive node restarts.
.. _progress-tracking:
Progress tracking
-----------------

View File

@ -4,14 +4,10 @@ import net.corda.core.contracts.Amount
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.UpgradedContract
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.StateMachineInfo
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.*
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.StateMachineTransactionMapping
@ -20,7 +16,6 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.requirePermission
import net.corda.node.services.startFlowPermission
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.transaction
@ -41,7 +36,7 @@ class CordaRPCOpsImpl(
private val smm: StateMachineManager,
private val database: Database
) : CordaRPCOps {
override val protocolVersion: Int get() = 0
override val protocolVersion: Int = 0
override fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>> {
return database.transaction {
@ -100,15 +95,16 @@ class CordaRPCOpsImpl(
}
}
// TODO: Check that this flow is annotated as being intended for RPC invocation
override fun <T : Any> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> {
requirePermission(startFlowPermission(logicType))
return services.invokeFlowAsync(logicType, *args).createHandle(hasProgress = true) as FlowProgressHandle<T>
}
// TODO: Check that this flow is annotated as being intended for RPC invocation
override fun <T : Any> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T> {
requirePermission(startFlowPermission(logicType))
val stateMachine = services.invokeFlowAsync(logicType, *args) as FlowStateMachineImpl<T>
return FlowHandle(
id = stateMachine.id,
progress = stateMachine.logic.track()?.second ?: Observable.empty(),
returnValue = stateMachine.resultFuture
)
return services.invokeFlowAsync(logicType, *args).createHandle(hasProgress = false)
}
override fun attachmentExists(id: SecureHash): Boolean {

View File

@ -6,6 +6,7 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.client.rpc.notUsed
import net.corda.core.abbreviate
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
@ -13,7 +14,10 @@ import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.random63BitValue
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.UntrustworthyData
@ -27,6 +31,7 @@ import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import rx.Observable
import java.sql.Connection
import java.sql.SQLException
import java.util.*
@ -98,6 +103,15 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
logic.stateMachine = this
}
override fun createHandle(hasProgress: Boolean): FlowHandle<R> = if (hasProgress)
FlowProgressHandleImpl(
id = id,
returnValue = resultFuture,
progress = logic.track()?.second ?: Observable.empty()
)
else
FlowHandleImpl(id = id, returnValue = resultFuture)
@Suspendable
override fun run() {
createTransaction()
@ -410,3 +424,35 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
timer.update(duration, TimeUnit.NANOSECONDS)
}
}
// I would prefer for [FlowProgressHandleImpl] to extend [FlowHandleImpl],
// but Kotlin doesn't allow this for data classes, not even to create
// another data class!
@CordaSerializable
private data class FlowHandleImpl<A>(
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>) : FlowHandle<A> {
/**
* Use this function for flows whose returnValue is not going to be used, so as to free up server resources.
*/
override fun close() {
returnValue.cancel(false)
}
}
@CordaSerializable
private data class FlowProgressHandleImpl<A> (
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>,
override val progress: Observable<String>) : FlowProgressHandle<A> {
/**
* Use this function for flows whose returnValue and progress are not going to be used or tracked, so as to free up server resources.
* Note that it won't really close if one subscribes on progress [Observable], but then forgets to unsubscribe.
*/
override fun close() {
progress.notUsed()
returnValue.cancel(false)
}
}

View File

@ -29,13 +29,12 @@ import net.corda.testing.node.MockNetwork.MockNode
import net.corda.testing.sequence
import org.apache.commons.io.IOUtils
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Assert.assertArrayEquals
import org.junit.Before
import org.junit.Test
import rx.Observable
import java.io.ByteArrayOutputStream
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.*
class CordaRPCOpsImplTest {
@ -210,7 +209,7 @@ class CordaRPCOpsImplTest {
fun `can upload an attachment`() {
val inputJar = Thread.currentThread().contextClassLoader.getResourceAsStream(testJar)
val secureHash = rpc.uploadAttachment(inputJar)
assert(rpc.attachmentExists(secureHash))
assertTrue(rpc.attachmentExists(secureHash))
}
@Test
@ -223,6 +222,6 @@ class CordaRPCOpsImplTest {
IOUtils.copy(Thread.currentThread().contextClassLoader.getResourceAsStream(testJar), bufferFile)
IOUtils.copy(rpc.openAttachment(secureHash), bufferRpc)
assert(Arrays.equals(bufferFile.toByteArray(), bufferRpc.toByteArray()))
assertArrayEquals(bufferFile.toByteArray(), bufferRpc.toByteArray())
}
}

View File

@ -8,6 +8,7 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.node.ServiceHub
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.DUMMY_PUBKEY_1
@ -84,6 +85,8 @@ class InteractiveShellTest {
throw UnsupportedOperationException("not implemented")
}
override fun createHandle(hasProgress: Boolean): FlowProgressHandle<Any?> = throw UnsupportedOperationException("not implemented")
override val serviceHub: ServiceHub
get() = throw UnsupportedOperationException()
override val logger: Logger

View File

@ -11,7 +11,7 @@ import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.sizedInputStreamAndHash
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
@ -88,7 +88,7 @@ fun sender(rpc: CordaRPCOps, inputStream: InputStream, hash: SecureHash.SHA256)
// Send the transaction to the other recipient
val stx = ptx.toSignedTransaction()
println("Sending ${stx.id}")
val flowHandle = rpc.startFlow(::FinalityFlow, stx, setOf(otherSide))
val flowHandle = rpc.startTrackedFlow(::FinalityFlow, stx, setOf(otherSide))
flowHandle.progress.subscribe(::println)
flowHandle.returnValue.getOrThrow()
}

View File

@ -14,7 +14,6 @@ import net.corda.client.jfx.model.observableValue
import net.corda.client.mock.EventGenerator
import net.corda.client.mock.Generator
import net.corda.client.mock.pickOne
import net.corda.client.rpc.notUsed
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.Amount
import net.corda.core.contracts.GBP
@ -229,7 +228,6 @@ fun main(args: Array<String>) {
// Log to logger when flow finish.
fun FlowHandle<SignedTransaction>.log(seq: Int, name: String) {
val out = "[$seq] $name $id :"
progress.notUsed()
returnValue.success {
Main.log.info("$out ${it.id} ${(it.tx.outputs.first().data as Cash.State).amount}")
}.failure {
@ -242,7 +240,6 @@ fun main(args: Array<String>) {
for (ref in 0..1) {
for ((currency, issuer) in issuers) {
CashFlowCommand.IssueCash(Amount(1_000_000, currency), OpaqueBytes(ByteArray(1, { ref.toByte() })), it, notaryNode.nodeInfo.notaryIdentity).startFlow(issuer)
.progress.notUsed()
}
}
}