Move FlowHandle implementations back into core so that RPC clients can use them. (#653)

This commit is contained in:
Chris Rankin 2017-05-09 18:25:43 +01:00
parent d552037beb
commit 3eb14b6692
2 changed files with 37 additions and 35 deletions

View File

@ -2,6 +2,7 @@ package net.corda.core.messaging
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.serialization.CordaSerializable
import rx.Observable import rx.Observable
/** /**
@ -23,3 +24,37 @@ interface FlowHandle<A> : AutoCloseable {
interface FlowProgressHandle<A> : FlowHandle<A> { interface FlowProgressHandle<A> : FlowHandle<A> {
val progress: Observable<String> val progress: Observable<String>
} }
@CordaSerializable
data class FlowHandleImpl<A>(
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>) : FlowHandle<A> {
// Remember to add @Throws to FlowHandle.close() if this throws an exception.
override fun close() {
returnValue.cancel(false)
}
}
@CordaSerializable
data class FlowProgressHandleImpl<A>(
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>,
override val progress: Observable<String>) : FlowProgressHandle<A> {
// Remember to add @Throws to FlowProgressHandle.close() if this throws an exception.
override fun close() {
progress.notUsed()
returnValue.cancel(false)
}
}
// Private copy of the version in client:rpc.
private fun <T> Observable<T>.notUsed() {
try {
this.subscribe({}, {}).unsubscribe()
} catch (e: Exception) {
// Swallow any other exceptions as well.
}
}

View File

@ -6,7 +6,6 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import net.corda.client.rpc.notUsed
import net.corda.core.abbreviate import net.corda.core.abbreviate
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
@ -16,9 +15,9 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.FlowHandle import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowProgressHandle import net.corda.core.messaging.FlowHandleImpl
import net.corda.core.messaging.FlowProgressHandleImpl
import net.corda.core.random63BitValue import net.corda.core.random63BitValue
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.UntrustworthyData
@ -430,35 +429,3 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
timer.update(duration, TimeUnit.NANOSECONDS) timer.update(duration, TimeUnit.NANOSECONDS)
} }
} }
// I would prefer for [FlowProgressHandleImpl] to extend [FlowHandleImpl],
// but Kotlin doesn't allow this for data classes, not even to create
// another data class!
@CordaSerializable
private data class FlowHandleImpl<A>(
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>) : FlowHandle<A> {
/**
* Use this function for flows whose returnValue is not going to be used, so as to free up server resources.
*/
override fun close() {
returnValue.cancel(false)
}
}
@CordaSerializable
private data class FlowProgressHandleImpl<A> (
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>,
override val progress: Observable<String>) : FlowProgressHandle<A> {
/**
* Use this function for flows whose returnValue and progress are not going to be used or tracked, so as to free up server resources.
* Note that it won't really close if one subscribes on progress [Observable], but then forgets to unsubscribe.
*/
override fun close() {
progress.notUsed()
returnValue.cancel(false)
}
}