Removed createHandle from the FlowStateMachine interface as it doesn't need to be publicly exposed

This commit is contained in:
Shams Asari
2017-05-03 16:06:30 +01:00
parent 18a0df4239
commit de83866ebe
9 changed files with 66 additions and 68 deletions

View File

@ -5,7 +5,6 @@ import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.ScheduledStateRef import net.corda.core.contracts.ScheduledStateRef
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.messaging.FlowHandle
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
@ -16,12 +15,11 @@ import java.util.*
/** /**
* FlowInitiator holds information on who started the flow. We have different ways of doing that: via RPC [FlowInitiator.RPC], * FlowInitiator holds information on who started the flow. We have different ways of doing that: via RPC [FlowInitiator.RPC],
* communication started by peer node [FlowInitiator.Peer], scheduled flows [FlowInitiator.Scheduled] * communication started by peer node [FlowInitiator.Peer], scheduled flows [FlowInitiator.Scheduled]
* or manual [FlowInitiator.Manual]. The last case is for all flows started in tests, shell etc. It was added * or via the Corda Shell [FlowInitiator.Shell].
* because we can start flow directly using [StateMachineManager.add] or [ServiceHubInternal.startFlow].
*/ */
@CordaSerializable @CordaSerializable
sealed class FlowInitiator { sealed class FlowInitiator {
/** Started using [CordaRPCOps.startFlowDynamic]. */ /** Started using [net.corda.core.messaging.CordaRPCOps.startFlowDynamic]. */
data class RPC(val username: String) : FlowInitiator() data class RPC(val username: String) : FlowInitiator()
/** Started when we get new session initiation request. */ /** Started when we get new session initiation request. */
data class Peer(val party: Party) : FlowInitiator() data class Peer(val party: Party) : FlowInitiator()
@ -61,8 +59,6 @@ interface FlowStateMachine<R> {
@Suspendable @Suspendable
fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction
fun createHandle(hasProgress: Boolean): FlowHandle<R>
val serviceHub: ServiceHub val serviceHub: ServiceHub
val logger: Logger val logger: Logger
val id: StateMachineRunId val id: StateMachineRunId

View File

@ -13,6 +13,11 @@ import rx.Observable
interface FlowHandle<A> : AutoCloseable { interface FlowHandle<A> : AutoCloseable {
val id: StateMachineRunId val id: StateMachineRunId
val returnValue: ListenableFuture<A> val returnValue: ListenableFuture<A>
/**
* Use this function for flows whose returnValue is not going to be used, so as to free up server resources.
*/
override fun close()
} }
/** /**
@ -22,4 +27,11 @@ interface FlowHandle<A> : AutoCloseable {
*/ */
interface FlowProgressHandle<A> : FlowHandle<A> { interface FlowProgressHandle<A> : FlowHandle<A> {
val progress: Observable<String> val progress: Observable<String>
/**
* Use this function for flows whose returnValue and progress are not going to be used or tracked, so as to free up
* server resources.
* Note that it won't really close if one subscribes on progress [Observable], but then forgets to unsubscribe.
*/
override fun close()
} }

View File

@ -10,7 +10,10 @@ import net.corda.core.contracts.Amount
import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.PartyAndReference
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.crypto.X509Utilities import net.corda.core.crypto.X509Utilities
import net.corda.core.flows.* import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.FlowVersion
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.messaging.SingleMessageRecipient
@ -43,6 +46,7 @@ import net.corda.node.services.network.PersistentNetworkMapService
import net.corda.node.services.persistence.* import net.corda.node.services.persistence.*
import net.corda.node.services.schema.HibernateObserver import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.statemachine.flowVersion import net.corda.node.services.statemachine.flowVersion
import net.corda.node.services.transactions.* import net.corda.node.services.transactions.*
@ -131,7 +135,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val flowLogicRefFactory: FlowLogicRefFactory get() = flowLogicFactory override val flowLogicRefFactory: FlowLogicRefFactory get() = flowLogicFactory
override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachine<T> { override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachineImpl<T> {
return serverThread.fetchFrom { smm.add(logic, flowInitiator) } return serverThread.fetchFrom { smm.add(logic, flowInitiator) }
} }

View File

@ -1,5 +1,7 @@
package net.corda.node.internal package net.corda.node.internal
import com.google.common.util.concurrent.ListenableFuture
import net.corda.client.rpc.notUsed
import net.corda.core.contracts.Amount import net.corda.core.contracts.Amount
import net.corda.core.contracts.ContractState import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateAndRef
@ -13,14 +15,15 @@ import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.StateMachineTransactionMapping import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.node.services.Vault import net.corda.core.node.services.Vault
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.requirePermission import net.corda.node.services.messaging.requirePermission
import net.corda.node.services.startFlowPermission import net.corda.node.services.startFlowPermission
import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.transaction import net.corda.node.utilities.transaction
import org.bouncycastle.asn1.x500.X500Name
import net.corda.nodeapi.CURRENT_RPC_USER import net.corda.nodeapi.CURRENT_RPC_USER
import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Database
import rx.Observable import rx.Observable
import java.io.InputStream import java.io.InputStream
@ -100,14 +103,20 @@ class CordaRPCOpsImpl(
override fun <T : Any> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> { override fun <T : Any> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> {
requirePermission(startFlowPermission(logicType)) requirePermission(startFlowPermission(logicType))
val currentUser = FlowInitiator.RPC(CURRENT_RPC_USER.get().username) val currentUser = FlowInitiator.RPC(CURRENT_RPC_USER.get().username)
return services.invokeFlowAsync(logicType, currentUser, *args).createHandle(hasProgress = true) as FlowProgressHandle<T> val stateMachine = services.invokeFlowAsync(logicType, currentUser, *args)
return FlowProgressHandleImpl(
id = stateMachine.id,
returnValue = stateMachine.resultFuture,
progress = stateMachine.logic.track()?.second ?: Observable.empty()
)
} }
// TODO: Check that this flow is annotated as being intended for RPC invocation // TODO: Check that this flow is annotated as being intended for RPC invocation
override fun <T : Any> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T> { override fun <T : Any> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T> {
requirePermission(startFlowPermission(logicType)) requirePermission(startFlowPermission(logicType))
val currentUser = FlowInitiator.RPC(CURRENT_RPC_USER.get().username) val currentUser = FlowInitiator.RPC(CURRENT_RPC_USER.get().username)
return services.invokeFlowAsync(logicType, currentUser, *args).createHandle(hasProgress = false) val stateMachine = services.invokeFlowAsync(logicType, currentUser, *args)
return FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture)
} }
override fun attachmentExists(id: SecureHash): Boolean { override fun attachmentExists(id: SecureHash): Boolean {
@ -161,4 +170,31 @@ class CordaRPCOpsImpl(
} }
} }
} }
// I would prefer for [FlowProgressHandleImpl] to extend [FlowHandleImpl],
// but Kotlin doesn't allow this for data classes, not even to create
// another data class!
@CordaSerializable
private data class FlowHandleImpl<A>(
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>) : FlowHandle<A> {
// Remember to add @Throws to FlowHandle.close if this throws an exception
override fun close() {
returnValue.cancel(false)
}
}
@CordaSerializable
private data class FlowProgressHandleImpl<A>(
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>,
override val progress: Observable<String>) : FlowProgressHandle<A> {
// Remember to add @Throws to FlowProgressHandle.close if this throws an exception
override fun close() {
progress.notUsed()
returnValue.cancel(false)
}
}
} }

View File

@ -101,7 +101,7 @@ abstract class ServiceHubInternal : PluginServiceHub {
* Starts an already constructed flow. Note that you must be on the server thread to call this method. * Starts an already constructed flow. Note that you must be on the server thread to call this method.
* @param flowInitiator indicates who started the flow, see: [FlowInitiator]. * @param flowInitiator indicates who started the flow, see: [FlowInitiator].
*/ */
abstract fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachine<T> abstract fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachineImpl<T>
/** /**
@ -115,7 +115,7 @@ abstract class ServiceHubInternal : PluginServiceHub {
fun <T : Any> invokeFlowAsync( fun <T : Any> invokeFlowAsync(
logicType: Class<out FlowLogic<T>>, logicType: Class<out FlowLogic<T>>,
flowInitiator: FlowInitiator, flowInitiator: FlowInitiator,
vararg args: Any?): FlowStateMachine<T> { vararg args: Any?): FlowStateMachineImpl<T> {
val logicRef = flowLogicRefFactory.create(logicType, *args) val logicRef = flowLogicRefFactory.create(logicType, *args)
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
val logic = flowLogicRefFactory.toFlowLogic(logicRef) as FlowLogic<T> val logic = flowLogicRefFactory.toFlowLogic(logicRef) as FlowLogic<T>

View File

@ -6,16 +6,12 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import net.corda.client.rpc.notUsed
import net.corda.core.ErrorOr import net.corda.core.ErrorOr
import net.corda.core.abbreviate import net.corda.core.abbreviate
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.flows.* import net.corda.core.flows.*
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.random63BitValue import net.corda.core.random63BitValue
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.UntrustworthyData
@ -29,7 +25,6 @@ import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.transactions.TransactionManager import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.slf4j.Logger import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import rx.Observable
import java.lang.reflect.Modifier import java.lang.reflect.Modifier
import java.sql.Connection import java.sql.Connection
import java.sql.SQLException import java.sql.SQLException
@ -102,15 +97,6 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
logic.stateMachine = this logic.stateMachine = this
} }
override fun createHandle(hasProgress: Boolean): FlowHandle<R> = if (hasProgress)
FlowProgressHandleImpl(
id = id,
returnValue = resultFuture,
progress = logic.track()?.second ?: Observable.empty()
)
else
FlowHandleImpl(id = id, returnValue = resultFuture)
@Suspendable @Suspendable
override fun run() { override fun run() {
createTransaction() createTransaction()
@ -444,35 +430,3 @@ val Class<out FlowLogic<*>>.flowVersion: Int get() {
require(flowVersion.value > 0) { "Flow versions have to be greater or equal to 1" } require(flowVersion.value > 0) { "Flow versions have to be greater or equal to 1" }
return flowVersion.value return flowVersion.value
} }
// I would prefer for [FlowProgressHandleImpl] to extend [FlowHandleImpl],
// but Kotlin doesn't allow this for data classes, not even to create
// another data class!
@CordaSerializable
private data class FlowHandleImpl<A>(
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>) : FlowHandle<A> {
/**
* Use this function for flows whose returnValue is not going to be used, so as to free up server resources.
*/
override fun close() {
returnValue.cancel(false)
}
}
@CordaSerializable
private data class FlowProgressHandleImpl<A>(
override val id: StateMachineRunId,
override val returnValue: ListenableFuture<A>,
override val progress: Observable<String>) : FlowProgressHandle<A> {
/**
* Use this function for flows whose returnValue and progress are not going to be used or tracked, so as to free up server resources.
* Note that it won't really close if one subscribes on progress [Observable], but then forgets to unsubscribe.
*/
override fun close() {
progress.notUsed()
returnValue.cancel(false)
}
}

View File

@ -485,7 +485,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
* *
* Note that you must be on the [executor] thread. * Note that you must be on the [executor] thread.
*/ */
fun <T> add(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachine<T> { fun <T> add(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachineImpl<T> {
// TODO: Check that logic has @Suspendable on its call method. // TODO: Check that logic has @Suspendable on its call method.
executor.checkOnThread() executor.checkOnThread()
// We swap out the parent transaction context as using this frequently leads to a deadlock as we wait // We swap out the parent transaction context as using this frequently leads to a deadlock as we wait

View File

@ -6,11 +6,9 @@ import net.corda.core.contracts.Amount
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowInitiator import net.corda.core.flows.FlowInitiator
import net.corda.core.crypto.X509Utilities
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.DUMMY_PUBKEY_1 import net.corda.core.utilities.DUMMY_PUBKEY_1
@ -31,7 +29,7 @@ class InteractiveShellTest {
constructor(b: Int, c: String) : this(b.toString() + c) constructor(b: Int, c: String) : this(b.toString() + c)
constructor(amount: Amount<Currency>) : this(amount.toString()) constructor(amount: Amount<Currency>) : this(amount.toString())
constructor(pair: Pair<Amount<Currency>, SecureHash.SHA256>) : this(pair.toString()) constructor(pair: Pair<Amount<Currency>, SecureHash.SHA256>) : this(pair.toString())
constructor(party: Party) : this(party.name.toString()) constructor(party: Party) : this(party.name)
override fun call() = a override fun call() = a
} }
@ -69,7 +67,7 @@ class InteractiveShellTest {
fun flowTooManyParams() = check("b: 12, c: Yo, d: Bar", "") fun flowTooManyParams() = check("b: 12, c: Yo, d: Bar", "")
@Test @Test
fun party() = check("party: \"${someCorpLegalName}\"", someCorpLegalName.toString()) fun party() = check("party: \"$someCorpLegalName\"", someCorpLegalName)
class DummyFSM(val logic: FlowA) : FlowStateMachine<Any?> { class DummyFSM(val logic: FlowA) : FlowStateMachine<Any?> {
override fun <T : Any> sendAndReceive(receiveType: Class<T>, otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>, retrySend: Boolean): UntrustworthyData<T> { override fun <T : Any> sendAndReceive(receiveType: Class<T>, otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>, retrySend: Boolean): UntrustworthyData<T> {
@ -88,8 +86,6 @@ class InteractiveShellTest {
throw UnsupportedOperationException("not implemented") throw UnsupportedOperationException("not implemented")
} }
override fun createHandle(hasProgress: Boolean): FlowProgressHandle<Any?> = throw UnsupportedOperationException("not implemented")
override val serviceHub: ServiceHub override val serviceHub: ServiceHub
get() = throw UnsupportedOperationException() get() = throw UnsupportedOperationException()
override val logger: Logger override val logger: Logger

View File

@ -5,7 +5,6 @@ import net.corda.core.crypto.Party
import net.corda.core.flows.FlowInitiator import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.FlowStateMachine
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.services.* import net.corda.core.node.services.*
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
@ -14,6 +13,7 @@ import net.corda.node.serialization.NodeClock
import net.corda.node.services.api.* import net.corda.node.services.api.*
import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.testing.MOCK_IDENTITY_SERVICE import net.corda.testing.MOCK_IDENTITY_SERVICE
@ -69,7 +69,7 @@ open class MockServiceHubInternal(
override fun recordTransactions(txs: Iterable<SignedTransaction>) = recordTransactionsInternal(txStorageService, txs) override fun recordTransactions(txs: Iterable<SignedTransaction>) = recordTransactionsInternal(txStorageService, txs)
override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachine<T> { override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachineImpl<T> {
return smm.executor.fetchFrom { smm.add(logic, flowInitiator) } return smm.executor.fetchFrom { smm.add(logic, flowInitiator) }
} }