From d2d7cbc9ecfc2b5255d2602a792631cffc09ece3 Mon Sep 17 00:00:00 2001 From: Chris Rankin Date: Wed, 19 Apr 2017 20:11:51 +0100 Subject: [PATCH] CORDA-299: Remove progress Observable from FlowHandle, unless explicitly requested. (#513) * Remove progress Observable from FlowHandle, unless explicitly requested. * Refactor FlowHandle creation into FlowStateMachine. * Prevent server-side queue subscription for dummy Observable. * Refactor so that RPC client does not receive any unused progress Observables. This is the simplest way of ensuring we have no dangling "hot" Observables when the RPC client closes. * Test flow has correct handle. * Resolve some compiler warnings. * Document how starting a flow does not involve progress tracking by default. * Update changelog and release notes for RPC API. * Rename new RPC API to startTrackedFlow(). * Remove optimisation because of its affect on the client-side. * Update documentation. --- .../corda/client/rpc/CordaRPCClientTest.kt | 22 +++-- .../net/corda/core/flows/FlowStateMachine.kt | 3 + .../net/corda/core/messaging/CordaRPCOps.kt | 93 ++++++++++--------- .../net/corda/core/messaging/FlowHandle.kt | 25 +++++ docs/source/changelog.rst | 2 + docs/source/flow-state-machines.rst | 12 ++- .../corda/node/internal/CordaRPCOpsImpl.kt | 22 ++--- .../statemachine/FlowStateMachineImpl.kt | 46 +++++++++ .../net/corda/node/CordaRPCOpsImplTest.kt | 9 +- .../net/corda/node/InteractiveShellTest.kt | 3 + .../corda/attachmentdemo/AttachmentDemo.kt | 4 +- .../main/kotlin/net/corda/explorer/Main.kt | 3 - 12 files changed, 169 insertions(+), 75 deletions(-) create mode 100644 core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt index c4a8206b35..8ea64e56ab 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt @@ -3,7 +3,10 @@ package net.corda.client.rpc import net.corda.core.contracts.DOLLARS import net.corda.core.flows.FlowException import net.corda.core.getOrThrow +import net.corda.core.messaging.FlowHandle +import net.corda.core.messaging.FlowProgressHandle import net.corda.core.messaging.startFlow +import net.corda.core.messaging.startTrackedFlow import net.corda.core.node.services.ServiceInfo import net.corda.core.random63BitValue import net.corda.core.serialization.OpaqueBytes @@ -21,8 +24,7 @@ import org.junit.After import org.junit.Before import org.junit.Test import java.util.* -import kotlin.test.assertEquals -import kotlin.test.assertTrue +import kotlin.test.* class CordaRPCClientTest : NodeBasedTest() { private val rpcUser = User("user1", "test", permissions = setOf( @@ -69,7 +71,7 @@ class CordaRPCClientTest : NodeBasedTest() { println("Creating proxy") val proxy = client.proxy() println("Starting flow") - val flowHandle = proxy.startFlow( + val flowHandle = proxy.startTrackedFlow( ::CashIssueFlow, 20.DOLLARS, OpaqueBytes.of(0), node.info.legalIdentity, node.info.legalIdentity) println("Started flow, waiting on result") @@ -90,6 +92,16 @@ class CordaRPCClientTest : NodeBasedTest() { } } + @Test + fun `check basic flow has no progress`() { + client.start(rpcUser.username, rpcUser.password) + val proxy = client.proxy() + proxy.startFlow(::CashPaymentFlow, 100.DOLLARS, node.info.legalIdentity).use { + assertFalse(it is FlowProgressHandle<*>) + assertTrue(it is FlowHandle<*>) + } + } + @Test fun `get cash balances`() { println("Starting client") @@ -105,9 +117,7 @@ class CordaRPCClientTest : NodeBasedTest() { node.info.legalIdentity, node.info.legalIdentity ) println("Started issuing cash, waiting on result") - flowHandle.progress.subscribe { - println("CashIssue PROGRESS $it") - } + flowHandle.returnValue.get() val finishCash = proxy.getCashBalances() println("Cash Balances: $finishCash") diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt index 13eac83fd7..2ba7c2d021 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt @@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable import com.google.common.util.concurrent.ListenableFuture import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash +import net.corda.core.messaging.FlowHandle import net.corda.core.node.ServiceHub import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction @@ -41,6 +42,8 @@ interface FlowStateMachine { @Suspendable fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction + fun createHandle(hasProgress: Boolean): FlowHandle + val serviceHub: ServiceHub val logger: Logger val id: StateMachineRunId diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index c3c78a8d30..1e2c76297a 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -44,7 +44,7 @@ sealed class StateMachineUpdate { /** * RPC operations that the node exposes to clients using the Java client library. These can be called from - * client apps and are implemented by the node in the [CordaRPCOpsImpl] class. + * client apps and are implemented by the node in the [net.corda.node.internal.CordaRPCOpsImpl] class. */ // TODO: The use of Pairs throughout is unfriendly for Java interop. @@ -81,12 +81,18 @@ interface CordaRPCOps : RPCOps { @RPCReturnsObservables fun networkMapUpdates(): Pair, Observable> + /** + * Start the given flow with the given arguments. + */ + @RPCReturnsObservables + fun startFlowDynamic(logicType: Class>, vararg args: Any?): FlowHandle + /** * Start the given flow with the given arguments, returning an [Observable] with a single observation of the * result of running the flow. */ @RPCReturnsObservables - fun startFlowDynamic(logicType: Class>, vararg args: Any?): FlowHandle + fun startTrackedFlowDynamic(logicType: Class>, vararg args: Any?): FlowProgressHandle /** * Returns Node's identity, assuming this will not change while the node is running. @@ -187,20 +193,20 @@ interface CordaRPCOps : RPCOps { inline fun > CordaRPCOps.startFlow( @Suppress("UNUSED_PARAMETER") flowConstructor: () -> R -) = startFlowDynamic(R::class.java) +): FlowHandle = startFlowDynamic(R::class.java) inline fun > CordaRPCOps.startFlow( @Suppress("UNUSED_PARAMETER") flowConstructor: (A) -> R, arg0: A -) = startFlowDynamic(R::class.java, arg0) +): FlowHandle = startFlowDynamic(R::class.java, arg0) inline fun > CordaRPCOps.startFlow( @Suppress("UNUSED_PARAMETER") flowConstructor: (A, B) -> R, arg0: A, arg1: B -) = startFlowDynamic(R::class.java, arg0, arg1) +): FlowHandle = startFlowDynamic(R::class.java, arg0, arg1) inline fun > CordaRPCOps.startFlow( @Suppress("UNUSED_PARAMETER") @@ -208,7 +214,7 @@ inline fun > CordaRPCOps.startFlow( arg0: A, arg1: B, arg2: C -) = startFlowDynamic(R::class.java, arg0, arg1, arg2) +): FlowHandle = startFlowDynamic(R::class.java, arg0, arg1, arg2) inline fun > CordaRPCOps.startFlow( @Suppress("UNUSED_PARAMETER") @@ -217,44 +223,47 @@ inline fun > CordaRPCOps.startFlow arg1: B, arg2: C, arg3: D -) = startFlowDynamic(R::class.java, arg0, arg1, arg2, arg3) +): FlowHandle = startFlowDynamic(R::class.java, arg0, arg1, arg2, arg3) /** - * [FlowHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value. - * - * @param id The started state machine's ID. - * @param progress The stream of progress tracker events. - * @param returnValue A [ListenableFuture] of the flow's return value. + * Same again, except this time with progress-tracking enabled. */ -@CordaSerializable -data class FlowHandle( - val id: StateMachineRunId, - val progress: Observable, - val returnValue: ListenableFuture) : AutoCloseable { +@Suppress("unused") +inline fun > CordaRPCOps.startTrackedFlow( + @Suppress("unused_parameter") + flowConstructor: () -> R +): FlowProgressHandle = startTrackedFlowDynamic(R::class.java) - /** - * Use this function for flows that returnValue and progress are not going to be used or tracked, so as to free up server resources. - * Note that it won't really close if one subscribes on progress [Observable], but then forgets to unsubscribe. - */ - override fun close() { - returnValue.cancel(false) - progress.notUsed() - } -} +@Suppress("unused") +inline fun > CordaRPCOps.startTrackedFlow( + @Suppress("unused_parameter") + flowConstructor: (A) -> R, + arg0: A +): FlowProgressHandle = startTrackedFlowDynamic(R::class.java, arg0) -/** - * This function should be invoked on any unwanted Observables returned from RPC to release the server resources. - * TODO: Delete this function when this file is moved to RPC module, as Observable.notUsed() exists there already. - * - * subscribe({}, {}) was used instead of simply calling subscribe() - * because if an {@code onError} emission arrives (eg. due to an non-correct transaction, such as 'Not sufficient funds') - * then {@link OnErrorNotImplementedException} is thrown. As we won't handle exceptions from unused Observables, - * empty inputs are used to subscribe({}, {}). - */ -fun Observable.notUsed() { - try { - this.subscribe({}, {}).unsubscribe() - } catch (e: Exception) { - // Swallow any other exceptions as well; we won't handle exceptions from unused Observables. - } -} +@Suppress("unused") +inline fun > CordaRPCOps.startTrackedFlow( + @Suppress("unused_parameter") + flowConstructor: (A, B) -> R, + arg0: A, + arg1: B +): FlowProgressHandle = startTrackedFlowDynamic(R::class.java, arg0, arg1) + +@Suppress("unused") +inline fun > CordaRPCOps.startTrackedFlow( + @Suppress("unused_parameter") + flowConstructor: (A, B, C) -> R, + arg0: A, + arg1: B, + arg2: C +): FlowProgressHandle = startTrackedFlowDynamic(R::class.java, arg0, arg1, arg2) + +@Suppress("unused") +inline fun > CordaRPCOps.startTrackedFlow( + @Suppress("unused_parameter") + flowConstructor: (A, B, C, D) -> R, + arg0: A, + arg1: B, + arg2: C, + arg3: D +): FlowProgressHandle = startTrackedFlowDynamic(R::class.java, arg0, arg1, arg2, arg3) diff --git a/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt b/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt new file mode 100644 index 0000000000..3d75d64ca1 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt @@ -0,0 +1,25 @@ +package net.corda.core.messaging + +import com.google.common.util.concurrent.ListenableFuture +import net.corda.core.flows.StateMachineRunId +import rx.Observable + +/** + * [FlowHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value. + * + * @property id The started state machine's ID. + * @property returnValue A [ListenableFuture] of the flow's return value. + */ +interface FlowHandle : AutoCloseable { + val id: StateMachineRunId + val returnValue: ListenableFuture +} + +/** + * [FlowProgressHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value. + * + * @property progress The stream of progress tracker events. + */ +interface FlowProgressHandle : FlowHandle { + val progress: Observable +} diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index d63efd67e1..ad7dd8c3a3 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -14,6 +14,8 @@ UNRELEASED * DemoBench is now installed as ``Corda DemoBench`` instead of ``DemoBench``. + * Starting a flow no longer enables progress tracking by default. To enable it, you must now invoke your flow using one of the new ``CordaRPCOps.startTrackedFlow`` functions. ``FlowHandle`` is now an interface, and its ``progress: Observable`` field has been moved to the ``FlowProgressHandle`` child interface. Hence developers no longer need to invoke ``notUsed`` on their flows' unwanted progress-tracking observables. + Milestone 10.0 -------------- diff --git a/docs/source/flow-state-machines.rst b/docs/source/flow-state-machines.rst index f83681771f..05d76a164f 100644 --- a/docs/source/flow-state-machines.rst +++ b/docs/source/flow-state-machines.rst @@ -215,12 +215,14 @@ Java reflection ``Class`` object that describes the flow class to use (in this c It also takes a set of arguments to pass to the constructor. Because it's possible for flow invocations to be requested by untrusted code (e.g. a state that you have been sent), the types that can be passed into the flow are checked against a whitelist, which can be extended by apps themselves at load time. There are also a series -of inlined extension functions of the form ``CordaRPCOps.startFlow`` which help with invoking flows in a type +of inlined Kotlin extension functions of the form ``CordaRPCOps.startFlow`` which help with invoking flows in a type safe manner. -The process of starting a flow returns a ``FlowHandle`` that you can use to either observe -the result, observe its progress and which also contains a permanent identifier for the invoked flow in the form -of the ``StateMachineRunId``. +The process of starting a flow returns a ``FlowHandle`` that you can use to observe +the result, and which also contains a permanent identifier for the invoked flow in the form +of the ``StateMachineRunId``. Should you also wish to track the progress of your flow (see :ref:`progress-tracking`) then you can invoke your flow instead using ``CordaRPCOps.startTrackedFlowDynamic`` or any of its corresponding ``CordaRPCOps.startTrackedFlow`` extension functions. These will return a ``FlowProgressHandle``, which is just like a ``FlowHandle`` except that it also contains an observable ``progress`` field. + +.. note:: The developer `must` then either subscribe to this ``progress`` observable or invoke the ``notUsed()`` extension function for it. Otherwise the unused observable will waste resources back in the node. In a two party flow only one side is to be manually started using ``CordaRPCOps.startFlow``. The other side has to be registered by its node to respond to the initiating flow via ``PluginServiceHub.registerFlowInitiator``. @@ -413,6 +415,8 @@ This code is longer but no more complicated. Here are some things to pay attenti As you can see, the flow logic is straightforward and does not contain any callbacks or network glue code, despite the fact that it takes minimal resources and can survive node restarts. +.. _progress-tracking: + Progress tracking ----------------- diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 8cc32ebb4d..b999e0af7b 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -4,14 +4,10 @@ import net.corda.core.contracts.Amount import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.UpgradedContract -import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId -import net.corda.core.messaging.CordaRPCOps -import net.corda.core.messaging.FlowHandle -import net.corda.core.messaging.StateMachineInfo -import net.corda.core.messaging.StateMachineUpdate +import net.corda.core.messaging.* import net.corda.core.node.NodeInfo import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.StateMachineTransactionMapping @@ -20,7 +16,6 @@ import net.corda.core.transactions.SignedTransaction import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.messaging.requirePermission import net.corda.node.services.startFlowPermission -import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.utilities.AddOrRemove import net.corda.node.utilities.transaction @@ -41,7 +36,7 @@ class CordaRPCOpsImpl( private val smm: StateMachineManager, private val database: Database ) : CordaRPCOps { - override val protocolVersion: Int get() = 0 + override val protocolVersion: Int = 0 override fun networkMapUpdates(): Pair, Observable> { return database.transaction { @@ -100,15 +95,16 @@ class CordaRPCOpsImpl( } } + // TODO: Check that this flow is annotated as being intended for RPC invocation + override fun startTrackedFlowDynamic(logicType: Class>, vararg args: Any?): FlowProgressHandle { + requirePermission(startFlowPermission(logicType)) + return services.invokeFlowAsync(logicType, *args).createHandle(hasProgress = true) as FlowProgressHandle + } + // TODO: Check that this flow is annotated as being intended for RPC invocation override fun startFlowDynamic(logicType: Class>, vararg args: Any?): FlowHandle { requirePermission(startFlowPermission(logicType)) - val stateMachine = services.invokeFlowAsync(logicType, *args) as FlowStateMachineImpl - return FlowHandle( - id = stateMachine.id, - progress = stateMachine.logic.track()?.second ?: Observable.empty(), - returnValue = stateMachine.resultFuture - ) + return services.invokeFlowAsync(logicType, *args).createHandle(hasProgress = false) } override fun attachmentExists(id: SecureHash): Boolean { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 0467398fb0..20188f0173 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -6,6 +6,7 @@ import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.SettableFuture +import net.corda.client.rpc.notUsed import net.corda.core.abbreviate import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash @@ -13,7 +14,10 @@ import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowStateMachine import net.corda.core.flows.StateMachineRunId +import net.corda.core.messaging.FlowHandle +import net.corda.core.messaging.FlowProgressHandle import net.corda.core.random63BitValue +import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.UntrustworthyData @@ -27,6 +31,7 @@ import org.jetbrains.exposed.sql.Transaction import org.jetbrains.exposed.sql.transactions.TransactionManager import org.slf4j.Logger import org.slf4j.LoggerFactory +import rx.Observable import java.sql.Connection import java.sql.SQLException import java.util.* @@ -98,6 +103,15 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, logic.stateMachine = this } + override fun createHandle(hasProgress: Boolean): FlowHandle = if (hasProgress) + FlowProgressHandleImpl( + id = id, + returnValue = resultFuture, + progress = logic.track()?.second ?: Observable.empty() + ) + else + FlowHandleImpl(id = id, returnValue = resultFuture) + @Suspendable override fun run() { createTransaction() @@ -410,3 +424,35 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, timer.update(duration, TimeUnit.NANOSECONDS) } } + +// I would prefer for [FlowProgressHandleImpl] to extend [FlowHandleImpl], +// but Kotlin doesn't allow this for data classes, not even to create +// another data class! +@CordaSerializable +private data class FlowHandleImpl( + override val id: StateMachineRunId, + override val returnValue: ListenableFuture) : FlowHandle { + + /** + * Use this function for flows whose returnValue is not going to be used, so as to free up server resources. + */ + override fun close() { + returnValue.cancel(false) + } +} + +@CordaSerializable +private data class FlowProgressHandleImpl ( + override val id: StateMachineRunId, + override val returnValue: ListenableFuture, + override val progress: Observable) : FlowProgressHandle { + + /** + * Use this function for flows whose returnValue and progress are not going to be used or tracked, so as to free up server resources. + * Note that it won't really close if one subscribes on progress [Observable], but then forgets to unsubscribe. + */ + override fun close() { + progress.notUsed() + returnValue.cancel(false) + } +} diff --git a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt index 989e457e4d..afa13c3d95 100644 --- a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt @@ -29,13 +29,12 @@ import net.corda.testing.node.MockNetwork.MockNode import net.corda.testing.sequence import org.apache.commons.io.IOUtils import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.junit.Assert.assertArrayEquals import org.junit.Before import org.junit.Test import rx.Observable import java.io.ByteArrayOutputStream -import java.util.* -import kotlin.test.assertEquals -import kotlin.test.assertFalse +import kotlin.test.* class CordaRPCOpsImplTest { @@ -210,7 +209,7 @@ class CordaRPCOpsImplTest { fun `can upload an attachment`() { val inputJar = Thread.currentThread().contextClassLoader.getResourceAsStream(testJar) val secureHash = rpc.uploadAttachment(inputJar) - assert(rpc.attachmentExists(secureHash)) + assertTrue(rpc.attachmentExists(secureHash)) } @Test @@ -223,6 +222,6 @@ class CordaRPCOpsImplTest { IOUtils.copy(Thread.currentThread().contextClassLoader.getResourceAsStream(testJar), bufferFile) IOUtils.copy(rpc.openAttachment(secureHash), bufferRpc) - assert(Arrays.equals(bufferFile.toByteArray(), bufferRpc.toByteArray())) + assertArrayEquals(bufferFile.toByteArray(), bufferRpc.toByteArray()) } } diff --git a/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt b/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt index 11ce7f8639..6cbecc6f8e 100644 --- a/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt +++ b/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt @@ -8,6 +8,7 @@ import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowStateMachine import net.corda.core.flows.StateMachineRunId +import net.corda.core.messaging.FlowProgressHandle import net.corda.core.node.ServiceHub import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.DUMMY_PUBKEY_1 @@ -84,6 +85,8 @@ class InteractiveShellTest { throw UnsupportedOperationException("not implemented") } + override fun createHandle(hasProgress: Boolean): FlowProgressHandle = throw UnsupportedOperationException("not implemented") + override val serviceHub: ServiceHub get() = throw UnsupportedOperationException() override val logger: Logger diff --git a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt index bb5358eed6..f9aa80867c 100644 --- a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt +++ b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt @@ -11,7 +11,7 @@ import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps -import net.corda.core.messaging.startFlow +import net.corda.core.messaging.startTrackedFlow import net.corda.core.sizedInputStreamAndHash import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.DUMMY_NOTARY_KEY @@ -88,7 +88,7 @@ fun sender(rpc: CordaRPCOps, inputStream: InputStream, hash: SecureHash.SHA256) // Send the transaction to the other recipient val stx = ptx.toSignedTransaction() println("Sending ${stx.id}") - val flowHandle = rpc.startFlow(::FinalityFlow, stx, setOf(otherSide)) + val flowHandle = rpc.startTrackedFlow(::FinalityFlow, stx, setOf(otherSide)) flowHandle.progress.subscribe(::println) flowHandle.returnValue.getOrThrow() } diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt index 533d75871b..e972ce7242 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/Main.kt @@ -14,7 +14,6 @@ import net.corda.client.jfx.model.observableValue import net.corda.client.mock.EventGenerator import net.corda.client.mock.Generator import net.corda.client.mock.pickOne -import net.corda.client.rpc.notUsed import net.corda.contracts.asset.Cash import net.corda.core.contracts.Amount import net.corda.core.contracts.GBP @@ -229,7 +228,6 @@ fun main(args: Array) { // Log to logger when flow finish. fun FlowHandle.log(seq: Int, name: String) { val out = "[$seq] $name $id :" - progress.notUsed() returnValue.success { Main.log.info("$out ${it.id} ${(it.tx.outputs.first().data as Cash.State).amount}") }.failure { @@ -242,7 +240,6 @@ fun main(args: Array) { for (ref in 0..1) { for ((currency, issuer) in issuers) { CashFlowCommand.IssueCash(Amount(1_000_000, currency), OpaqueBytes(ByteArray(1, { ref.toByte() })), it, notaryNode.nodeInfo.notaryIdentity).startFlow(issuer) - .progress.notUsed() } } }