Move FlowHandle implementations back into core so that RPC clients can use them. (#653)

This commit is contained in:
Chris Rankin 2017-05-09 18:25:43 +01:00 committed by GitHub
parent bb0ac3253d
commit d3075928b2
2 changed files with 35 additions and 30 deletions

View File

@ -2,6 +2,7 @@ package net.corda.core.messaging
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.flows.StateMachineRunId
import net.corda.core.serialization.CordaSerializable
import rx.Observable
/**
@ -35,3 +36,37 @@ interface FlowProgressHandle<A> : FlowHandle<A> {
*/
override fun close()
}
@CordaSerializable
data class FlowHandleImpl<A>(
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>) : FlowHandle<A> {
// Remember to add @Throws to FlowHandle.close() if this throws an exception.
override fun close() {
returnValue.cancel(false)
}
}
@CordaSerializable
data class FlowProgressHandleImpl<A>(
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>,
override val progress: Observable<String>) : FlowProgressHandle<A> {
// Remember to add @Throws to FlowProgressHandle.close() if this throws an exception.
override fun close() {
progress.notUsed()
returnValue.cancel(false)
}
}
// Private copy of the version in client:rpc.
private fun <T> Observable<T>.notUsed() {
try {
this.subscribe({}, {}).unsubscribe()
} catch (e: Exception) {
// Swallow any other exceptions as well.
}
}

View File

@ -1,7 +1,5 @@
package net.corda.node.internal
import com.google.common.util.concurrent.ListenableFuture
import net.corda.client.rpc.notUsed
import net.corda.core.contracts.Amount
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
@ -9,7 +7,6 @@ import net.corda.core.contracts.UpgradedContract
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.*
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
@ -18,7 +15,6 @@ import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.getRpcContext
@ -194,30 +190,4 @@ class CordaRPCOpsImpl(
}
}
// I would prefer for [FlowProgressHandleImpl] to extend [FlowHandleImpl],
// but Kotlin doesn't allow this for data classes, not even to create
// another data class!
@CordaSerializable
private data class FlowHandleImpl<A>(
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>) : FlowHandle<A> {
// Remember to add @Throws to FlowHandle.close if this throws an exception
override fun close() {
returnValue.cancel(false)
}
}
@CordaSerializable
private data class FlowProgressHandleImpl<A>(
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>,
override val progress: Observable<String>) : FlowProgressHandle<A> {
// Remember to add @Throws to FlowProgressHandle.close if this throws an exception
override fun close() {
progress.notUsed()
returnValue.cancel(false)
}
}
}