diff --git a/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt b/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt index 3d75d64ca1..4c558d9767 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt @@ -2,6 +2,7 @@ package net.corda.core.messaging import com.google.common.util.concurrent.ListenableFuture import net.corda.core.flows.StateMachineRunId +import net.corda.core.serialization.CordaSerializable import rx.Observable /** @@ -23,3 +24,37 @@ interface FlowHandle : AutoCloseable { interface FlowProgressHandle : FlowHandle { val progress: Observable } + + +@CordaSerializable +data class FlowHandleImpl( + override val id: StateMachineRunId, + override val returnValue: ListenableFuture) : FlowHandle { + + // Remember to add @Throws to FlowHandle.close() if this throws an exception. + override fun close() { + returnValue.cancel(false) + } +} + +@CordaSerializable +data class FlowProgressHandleImpl( + override val id: StateMachineRunId, + override val returnValue: ListenableFuture, + override val progress: Observable) : FlowProgressHandle { + + // Remember to add @Throws to FlowProgressHandle.close() if this throws an exception. + override fun close() { + progress.notUsed() + returnValue.cancel(false) + } +} + +// Private copy of the version in client:rpc. +private fun Observable.notUsed() { + try { + this.subscribe({}, {}).unsubscribe() + } catch (e: Exception) { + // Swallow any other exceptions as well. + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 00a89df830..1ba95a4bdd 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -6,7 +6,6 @@ import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.SettableFuture -import net.corda.client.rpc.notUsed import net.corda.core.abbreviate import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash @@ -16,9 +15,9 @@ import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowStateMachine import net.corda.core.flows.StateMachineRunId import net.corda.core.messaging.FlowHandle -import net.corda.core.messaging.FlowProgressHandle +import net.corda.core.messaging.FlowHandleImpl +import net.corda.core.messaging.FlowProgressHandleImpl import net.corda.core.random63BitValue -import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.UntrustworthyData @@ -430,35 +429,3 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, timer.update(duration, TimeUnit.NANOSECONDS) } } - -// I would prefer for [FlowProgressHandleImpl] to extend [FlowHandleImpl], -// but Kotlin doesn't allow this for data classes, not even to create -// another data class! -@CordaSerializable -private data class FlowHandleImpl( - override val id: StateMachineRunId, - override val returnValue: ListenableFuture) : FlowHandle { - - /** - * Use this function for flows whose returnValue is not going to be used, so as to free up server resources. - */ - override fun close() { - returnValue.cancel(false) - } -} - -@CordaSerializable -private data class FlowProgressHandleImpl ( - override val id: StateMachineRunId, - override val returnValue: ListenableFuture, - override val progress: Observable) : FlowProgressHandle { - - /** - * Use this function for flows whose returnValue and progress are not going to be used or tracked, so as to free up server resources. - * Note that it won't really close if one subscribes on progress [Observable], but then forgets to unsubscribe. - */ - override fun close() { - progress.notUsed() - returnValue.cancel(false) - } -}