Interface changes for multi-threading

This commit is contained in:
Andras Slemmer
2017-09-11 12:09:32 +01:00
parent 54f901c4fe
commit c66a84bfc6
26 changed files with 377 additions and 154 deletions

View File

@ -11,13 +11,10 @@ import net.corda.core.flows.*
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.cert
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.toX509CertHolder
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.*
import net.corda.core.node.AppServiceHub
import net.corda.core.node.NodeInfo
@ -30,6 +27,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.debug
import net.corda.core.utilities.getOrThrow
import net.corda.node.VersionInfo
import net.corda.node.internal.classloading.requireAnnotation
import net.corda.node.internal.cordapp.CordappLoader
@ -146,6 +144,7 @@ abstract class AbstractNode(config: NodeConfiguration,
protected var myNotaryIdentity: PartyAndCertificate? = null
protected lateinit var checkpointStorage: CheckpointStorage
protected lateinit var smm: StateMachineManager
private lateinit var tokenizableServices: List<Any>
protected lateinit var attachments: NodeAttachmentService
protected lateinit var inNodeNetworkMapService: NetworkMapService
protected lateinit var network: MessagingService
@ -209,18 +208,11 @@ abstract class AbstractNode(config: NodeConfiguration,
val startedImpl = initialiseDatabasePersistence(schemaService) {
val transactionStorage = makeTransactionStorage()
val stateLoader = StateLoaderImpl(transactionStorage)
val tokenizableServices = makeServices(schemaService, transactionStorage, stateLoader)
val services = makeServices(schemaService, transactionStorage, stateLoader)
saveOwnNodeInfo()
smm = StateMachineManager(services,
checkpointStorage,
serverThread,
database,
busyNodeLatch,
cordappLoader.appClassLoader)
smm = makeStateMachineManager()
val flowStarter = FlowStarterImpl(serverThread, smm)
val schedulerService = NodeSchedulerService(platformClock, this@AbstractNode.database, flowStarter, stateLoader, unfinishedSchedules = busyNodeLatch, serverThread = serverThread)
smm.tokenizableServices.addAll(tokenizableServices)
smm.tokenizableServices.add(schedulerService)
if (serverThread is ExecutorService) {
runOnStop += {
// We wait here, even though any in-flight messages should have been drained away because the
@ -233,7 +225,8 @@ abstract class AbstractNode(config: NodeConfiguration,
val rpcOps = makeRPCOps(flowStarter)
startMessagingService(rpcOps)
installCoreFlows()
installCordaServices(flowStarter)
val cordaServices = installCordaServices(flowStarter)
tokenizableServices = services + cordaServices + schedulerService
registerCordappFlows()
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader
@ -245,7 +238,7 @@ abstract class AbstractNode(config: NodeConfiguration,
_nodeReadyFuture.captureLater(registerWithNetworkMapIfConfigured())
return startedImpl.apply {
database.transaction {
smm.start()
smm.start(tokenizableServices)
// Shut down the SMM so no Fibers are scheduled.
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
schedulerService.start()
@ -254,20 +247,34 @@ abstract class AbstractNode(config: NodeConfiguration,
}
}
protected open fun makeStateMachineManager(): StateMachineManager {
return StateMachineManagerImpl(
services,
checkpointStorage,
serverThread,
database,
busyNodeLatch,
cordappLoader.appClassLoader
)
}
private class ServiceInstantiationException(cause: Throwable?) : CordaException("Service Instantiation Error", cause)
private fun installCordaServices(flowStarter: FlowStarter) {
private fun installCordaServices(flowStarter: FlowStarter): List<SerializeAsToken> {
val loadedServices = cordappLoader.cordapps.flatMap { it.services }
filterServicesToInstall(loadedServices).forEach {
return filterServicesToInstall(loadedServices).mapNotNull {
try {
installCordaService(flowStarter, it)
} catch (e: NoSuchMethodException) {
log.error("${it.name}, as a Corda service, must have a constructor with a single parameter of type " +
ServiceHub::class.java.name)
null
} catch (e: ServiceInstantiationException) {
log.error("Corda service ${it.name} failed to instantiate", e.cause)
null
} catch (e: Exception) {
log.error("Unable to install Corda service ${it.name}", e)
null
}
}
}
@ -309,11 +316,11 @@ abstract class AbstractNode(config: NodeConfiguration,
return FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture)
}
private fun <T> startFlowChecked(flow: FlowLogic<T>): FlowStateMachineImpl<T> {
private fun <T> startFlowChecked(flow: FlowLogic<T>): FlowStateMachine<T> {
val logicType = flow.javaClass
require(logicType.isAnnotationPresent(StartableByService::class.java)) { "${logicType.name} was not designed for starting by a CordaService" }
val currentUser = FlowInitiator.Service(serviceInstance.javaClass.name)
return flowStarter.startFlow(flow, currentUser)
return flowStarter.startFlow(flow, currentUser).getOrThrow()
}
override fun equals(other: Any?): Boolean {
@ -327,7 +334,7 @@ abstract class AbstractNode(config: NodeConfiguration,
override fun hashCode() = Objects.hash(serviceHub, flowStarter, serviceInstance)
}
internal fun <T : SerializeAsToken> installCordaService(flowStarter: FlowStarter, serviceClass: Class<T>): T {
private fun <T : SerializeAsToken> installCordaService(flowStarter: FlowStarter, serviceClass: Class<T>): T {
serviceClass.requireAnnotation<CordaService>()
val service = try {
val serviceContext = AppServiceHubImpl<T>(services, flowStarter)
@ -351,7 +358,6 @@ abstract class AbstractNode(config: NodeConfiguration,
throw ServiceInstantiationException(e.cause)
}
cordappServices.putInstance(serviceClass, service)
smm.tokenizableServices += service
if (service is NotaryService) handleCustomNotaryService(service)
@ -359,6 +365,12 @@ abstract class AbstractNode(config: NodeConfiguration,
return service
}
fun <T : Any> findTokenizableService(clazz: Class<T>): T? {
return tokenizableServices.firstOrNull { clazz.isAssignableFrom(it.javaClass) }?.let { uncheckedCast(it) }
}
inline fun <reified T : Any> findTokenizableService() = findTokenizableService(T::class.java)
private fun handleCustomNotaryService(service: NotaryService) {
runOnStop += service::stop
service.start()
@ -801,7 +813,7 @@ abstract class AbstractNode(config: NodeConfiguration,
}
internal class FlowStarterImpl(private val serverThread: AffinityExecutor, private val smm: StateMachineManager) : FlowStarter {
override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator, ourIdentity: Party?): FlowStateMachineImpl<T> {
return serverThread.fetchFrom { smm.add(logic, flowInitiator, ourIdentity) }
override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator, ourIdentity: Party?): CordaFuture<FlowStateMachine<T>> {
return serverThread.fetchFrom { smm.startFlow(logic, flowInitiator, ourIdentity) }
}
}

View File

@ -10,6 +10,7 @@ import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.messaging.*
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
@ -18,12 +19,12 @@ import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.FlowPermissions.Companion.startFlowPermission
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.getRpcContext
import net.corda.node.services.messaging.requirePermission
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.CordaPersistence
import rx.Observable
@ -94,7 +95,7 @@ class CordaRPCOpsImpl(
return database.transaction {
val (allStateMachines, changes) = smm.track()
DataFeed(
allStateMachines.map { stateMachineInfoFromFlowLogic(it.logic) },
allStateMachines.map { stateMachineInfoFromFlowLogic(it) },
changes.map { stateMachineUpdateFromStateMachineChange(it) }
)
}
@ -146,13 +147,13 @@ class CordaRPCOpsImpl(
return FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture)
}
private fun <T> startFlow(logicType: Class<out FlowLogic<T>>, args: Array<out Any?>): FlowStateMachineImpl<T> {
private fun <T> startFlow(logicType: Class<out FlowLogic<T>>, args: Array<out Any?>): FlowStateMachine<T> {
require(logicType.isAnnotationPresent(StartableByRPC::class.java)) { "${logicType.name} was not designed for RPC" }
val rpcContext = getRpcContext()
rpcContext.requirePermission(startFlowPermission(logicType))
val currentUser = FlowInitiator.RPC(rpcContext.currentUser.username)
// TODO RPC flows should have mapping user -> identity that should be resolved automatically on starting flow.
return flowStarter.invokeFlowAsync(logicType, currentUser, *args)
return flowStarter.invokeFlowAsync(logicType, currentUser, *args).getOrThrow()
}
override fun attachmentExists(id: SecureHash): Boolean {

View File

@ -31,11 +31,6 @@ interface StartedNode<out N : AbstractNode> {
val rpcOps: CordaRPCOps
fun dispose() = internals.stop()
fun <T : FlowLogic<*>> registerInitiatedFlow(initiatedFlowClass: Class<T>) = internals.registerInitiatedFlow(initiatedFlowClass)
/**
* Use this method to install your Corda services in your tests. This is automatically done by the node when it
* starts up for all classes it finds which are annotated with [CordaService].
*/
fun <T : SerializeAsToken> installCordaService(serviceClass: Class<T>) = internals.installCordaService(services, serviceClass)
}
class StateLoaderImpl(private val validatedTransactions: TransactionStorage) : StateLoader {

View File

@ -20,6 +20,7 @@ import net.corda.core.node.services.NetworkMapCacheBase
import net.corda.core.node.services.TransactionStorage
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.internal.cordapp.CordappProviderInternal
@ -119,13 +120,13 @@ interface FlowStarter {
* defaults to [FlowInitiator.RPC] with username "Only For Testing".
*/
@VisibleForTesting
fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> = startFlow(logic, FlowInitiator.RPC("Only For Testing"))
fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> = startFlow(logic, FlowInitiator.RPC("Only For Testing")).getOrThrow()
/**
* Starts an already constructed flow. Note that you must be on the server thread to call this method.
* @param flowInitiator indicates who started the flow, see: [FlowInitiator].
*/
fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator, ourIdentity: Party? = null): FlowStateMachineImpl<T>
fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator, ourIdentity: Party? = null): CordaFuture<FlowStateMachine<T>>
/**
* Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the flow.
@ -138,7 +139,7 @@ interface FlowStarter {
fun <T> invokeFlowAsync(
logicType: Class<out FlowLogic<T>>,
flowInitiator: FlowInitiator,
vararg args: Any?): FlowStateMachineImpl<T> {
vararg args: Any?): CordaFuture<FlowStateMachine<T>> {
val logicRef = FlowLogicRefFactoryImpl.createForRPC(logicType, *args)
val logic: FlowLogic<T> = uncheckedCast(FlowLogicRefFactoryImpl.toFlowLogic(logicRef))
return startFlow(logic, flowInitiator, ourIdentity = null)

View File

@ -1,9 +1,8 @@
package net.corda.node.services.events
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.SettableFuture as QuasarSettableFuture
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture as GuavaSettableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.contracts.SchedulableState
import net.corda.core.contracts.ScheduledActivity
import net.corda.core.contracts.ScheduledStateRef
@ -13,6 +12,7 @@ import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.until
import net.corda.core.node.StateLoader
import net.corda.core.schemas.PersistentStateRef
@ -36,6 +36,8 @@ import javax.annotation.concurrent.ThreadSafe
import javax.persistence.Column
import javax.persistence.EmbeddedId
import javax.persistence.Entity
import co.paralleluniverse.strands.SettableFuture as QuasarSettableFuture
import com.google.common.util.concurrent.SettableFuture as GuavaSettableFuture
/**
* A first pass of a simple [SchedulerService] that works with [MutableClock]s for testing, demonstrations and simulations
@ -215,7 +217,7 @@ class NodeSchedulerService(private val clock: Clock,
* cancelled then we run the scheduled action. Finally we remove that action from the scheduled actions and
* recompute the next scheduled action.
*/
internal fun rescheduleWakeUp() {
private fun rescheduleWakeUp() {
// Note, we already have the mutex but we need the scope again here
val (scheduledState, ourRescheduledFuture) = mutex.alreadyLocked {
rescheduled?.cancel(false)
@ -245,7 +247,7 @@ class NodeSchedulerService(private val clock: Clock,
val scheduledFlow = getScheduledFlow(scheduledState)
if (scheduledFlow != null) {
flowName = scheduledFlow.javaClass.name
val future = flowStarter.startFlow(scheduledFlow, FlowInitiator.Scheduled(scheduledState)).resultFuture
val future = flowStarter.startFlow(scheduledFlow, FlowInitiator.Scheduled(scheduledState)).flatMap { it.resultFuture }
future.then {
unfinishedSchedules.countDown()
}

View File

@ -83,9 +83,40 @@ interface MessagingService {
* to send an ACK message back.
*
* @param retryId if provided the message will be scheduled for redelivery until [cancelRedelivery] is called for this id.
* Note that this feature should only be used when the target is an idempotent distributed service, e.g. a notary.
* Note that this feature should only be used when the target is an idempotent distributed service, e.g. a notary.
* @param sequenceKey an object that may be used to enable a parallel [MessagingService] implementation. Two
* subsequent send()s with the same [sequenceKey] (up to equality) are guaranteed to be delivered in the same
* sequence the send()s were called. By default this is chosen conservatively to be [target].
* @param acknowledgementHandler if non-null this handler will be called once the sent message has been committed by
* the broker. Note that if specified [send] itself may return earlier than the commit.
*/
fun send(message: Message, target: MessageRecipients, retryId: Long? = null)
fun send(
message: Message,
target: MessageRecipients,
retryId: Long? = null,
sequenceKey: Any = target,
acknowledgementHandler: (() -> Unit)? = null
)
/** A message with a target and sequenceKey specified. */
data class AddressedMessage(
val message: Message,
val target: MessageRecipients,
val retryId: Long? = null,
val sequenceKey: Any = target
)
/**
* Sends a list of messages to the specified recipients. This function allows for an efficient batching
* implementation.
*
* @param addressedMessages The list of messages together with the recipients, retry ids and sequence keys.
* @param retryId if provided the message will be scheduled for redelivery until [cancelRedelivery] is called for this id.
* Note that this feature should only be used when the target is an idempotent distributed service, e.g. a notary.
* @param acknowledgementHandler if non-null this handler will be called once all sent messages have been committed
* by the broker. Note that if specified [send] itself may return earlier than the commit.
*/
fun send(addressedMessages: List<AddressedMessage>, acknowledgementHandler: (() -> Unit)? = null)
/** Cancels the scheduled message redelivery for the specified [retryId] */
fun cancelRedelivery(retryId: Long)

View File

@ -22,7 +22,7 @@ import net.corda.node.services.RPCUserService
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.VerifierType
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.statemachine.StateMachineManagerImpl
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService
import net.corda.node.utilities.*
@ -485,7 +485,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
override fun send(message: Message, target: MessageRecipients, retryId: Long?) {
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any, acknowledgementHandler: (() -> Unit)?) {
// We have to perform sending on a different thread pool, since using the same pool for messaging and
// fibers leads to Netty buffer memory leaks, caused by both Netty and Quasar fiddling with thread-locals.
messagingExecutor.fetchFrom {
@ -502,7 +502,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString()))
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
if (amqDelayMillis > 0 && message.topicSession.topic == StateMachineManager.sessionTopic.topic) {
if (amqDelayMillis > 0 && message.topicSession.topic == StateMachineManagerImpl.sessionTopic.topic) {
putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis)
}
}
@ -523,6 +523,14 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
}
acknowledgementHandler?.invoke()
}
override fun send(addressedMessages: List<MessagingService.AddressedMessage>, acknowledgementHandler: (() -> Unit)?) {
for ((message, target, retryId, sequenceKey) in addressedMessages) {
send(message, target, retryId, sequenceKey, null)
}
acknowledgementHandler?.invoke()
}
private fun sendWithRetry(retryCount: Int, address: String, message: ClientMessage, retryId: Long) {

View File

@ -16,28 +16,46 @@ class FlowSessionImpl(
internal lateinit var sessionFlow: FlowLogic<*>
@Suspendable
override fun getCounterpartyFlowInfo(): FlowInfo {
return stateMachine.getFlowInfo(counterparty, sessionFlow)
override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo {
return stateMachine.getFlowInfo(counterparty, sessionFlow, maySkipCheckpoint)
}
@Suspendable
override fun <R : Any> sendAndReceive(receiveType: Class<R>, payload: Any): UntrustworthyData<R> {
return stateMachine.sendAndReceive(receiveType, counterparty, payload, sessionFlow)
override fun getCounterpartyFlowInfo() = getCounterpartyFlowInfo(maySkipCheckpoint = false)
@Suspendable
override fun <R : Any> sendAndReceive(
receiveType: Class<R>,
payload: Any,
maySkipCheckpoint: Boolean
): UntrustworthyData<R> {
return stateMachine.sendAndReceive(
receiveType,
counterparty,
payload,
sessionFlow,
retrySend = false,
maySkipCheckpoint = maySkipCheckpoint
)
}
@Suspendable
internal fun <R : Any> sendAndReceiveWithRetry(receiveType: Class<R>, payload: Any): UntrustworthyData<R> {
return stateMachine.sendAndReceive(receiveType, counterparty, payload, sessionFlow, retrySend = true)
override fun <R : Any> sendAndReceive(receiveType: Class<R>, payload: Any) = sendAndReceive(receiveType, payload, maySkipCheckpoint = false)
@Suspendable
override fun <R : Any> receive(receiveType: Class<R>, maySkipCheckpoint: Boolean): UntrustworthyData<R> {
return stateMachine.receive(receiveType, counterparty, sessionFlow, maySkipCheckpoint)
}
@Suspendable
override fun <R : Any> receive(receiveType: Class<R>): UntrustworthyData<R> {
return stateMachine.receive(receiveType, counterparty, sessionFlow)
override fun <R : Any> receive(receiveType: Class<R>) = receive(receiveType, maySkipCheckpoint = false)
@Suspendable
override fun send(payload: Any, maySkipCheckpoint: Boolean) {
return stateMachine.send(counterparty, payload, sessionFlow, maySkipCheckpoint)
}
@Suspendable
override fun send(payload: Any) {
return stateMachine.send(counterparty, payload, sessionFlow)
}
override fun send(payload: Any) = send(payload, maySkipCheckpoint = false)
}

View File

@ -163,7 +163,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
@Suspendable
override fun getFlowInfo(otherParty: Party, sessionFlow: FlowLogic<*>): FlowInfo {
override fun getFlowInfo(otherParty: Party, sessionFlow: FlowLogic<*>, maySkipCheckpoint: Boolean): FlowInfo {
val state = getConfirmedSession(otherParty, sessionFlow).state as FlowSessionState.Initiated
return state.context
}
@ -173,7 +173,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
otherParty: Party,
payload: Any,
sessionFlow: FlowLogic<*>,
retrySend: Boolean): UntrustworthyData<T> {
retrySend: Boolean,
maySkipCheckpoint: Boolean): UntrustworthyData<T> {
requireNonPrimitive(receiveType)
logger.debug { "sendAndReceive(${receiveType.name}, $otherParty, ${payload.toString().abbreviate(300)}) ..." }
val session = getConfirmedSessionIfPresent(otherParty, sessionFlow)
@ -192,7 +193,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Suspendable
override fun <T : Any> receive(receiveType: Class<T>,
otherParty: Party,
sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
sessionFlow: FlowLogic<*>,
maySkipCheckpoint: Boolean): UntrustworthyData<T> {
requireNonPrimitive(receiveType)
logger.debug { "receive(${receiveType.name}, $otherParty) ..." }
val session = getConfirmedSession(otherParty, sessionFlow)
@ -208,7 +210,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
@Suspendable
override fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>) {
override fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>, maySkipCheckpoint: Boolean) {
logger.debug { "send($otherParty, ${payload.toString().abbreviate(300)})" }
val session = getConfirmedSessionIfPresent(otherParty, sessionFlow)
if (session == null) {
@ -220,7 +222,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
@Suspendable
override fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction {
override fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>, maySkipCheckpoint: Boolean): SignedTransaction {
logger.debug { "waitForLedgerCommit($hash) ..." }
suspend(WaitForLedgerCommit(hash, sessionFlow.stateMachine as FlowStateMachineImpl<*>))
val stx = serviceHub.validatedTransactions.getTransaction(hash)

View File

@ -0,0 +1,78 @@
package net.corda.node.services.statemachine
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.messaging.DataFeed
import net.corda.core.utilities.Try
import rx.Observable
/**
* A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachine] objects.
* Each such object represents an instantiation of a (two-party) flow that has reached a particular point.
*
* An implementation of this interface will persist state machines to long term storage so they can survive process
* restarts and, if run with a single-threaded executor, will ensure no two state machines run concurrently with each
* other (bad for performance, good for programmer mental health!).
*
* A flow is a class with a single call method. The call method and any others it invokes are rewritten by a bytecode
* rewriting engine called Quasar, to ensure the code can be suspended and resumed at any point.
*
* TODO: Consider the issue of continuation identity more deeply: is it a safe assumption that a serialised
* continuation is always unique?
* TODO: Think about how to bring the system to a clean stop so it can be upgraded without any serialised stacks on disk
* TODO: Timeouts
* TODO: Surfacing of exceptions via an API and/or management UI
* TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt
* TODO: Don't store all active flows in memory, load from the database on demand.
*/
interface StateMachineManager {
/**
* Starts the state machine manager, loading and starting the state machines in storage.
*/
fun start(tokenizableServices: List<Any>)
/**
* Stops the state machine manager gracefully, waiting until all but [allowedUnsuspendedFiberCount] flows reach the
* next checkpoint.
*/
fun stop(allowedUnsuspendedFiberCount: Int)
/**
* Starts a new flow.
*
* @param flowLogic The flow's code.
* @param flowInitiator The initiator of the flow.
*/
fun <A> startFlow(flowLogic: FlowLogic<A>, flowInitiator: FlowInitiator, ourIdentity: Party? = null): CordaFuture<FlowStateMachine<A>>
/**
* Represents an addition/removal of a state machine.
*/
sealed class Change {
abstract val logic: FlowLogic<*>
data class Add(override val logic: FlowLogic<*>) : Change()
data class Removed(override val logic: FlowLogic<*>, val result: Try<*>) : Change()
}
/**
* Returns the list of live state machines and a stream of subsequent additions/removals of them.
*/
fun track(): DataFeed<List<FlowLogic<*>>, Change>
/**
* The stream of additions/removals of flows.
*/
val changes: Observable<Change>
/**
* Returns the currently live flows of type [flowClass], and their corresponding result future.
*/
fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>>
/**
* Returns all currently live flows.
*/
val allStateMachines: List<FlowLogic<*>>
}

View File

@ -16,6 +16,7 @@ import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.SerializationDefaults.CHECKPOINT_CONTEXT
import net.corda.core.serialization.SerializationDefaults.SERIALIZATION_FACTORY
@ -48,42 +49,24 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.SECONDS
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.ArrayList
/**
* A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachineImpl] objects.
* Each such object represents an instantiation of a (two-party) flow that has reached a particular point.
*
* An implementation of this class will persist state machines to long term storage so they can survive process restarts
* and, if run with a single-threaded executor, will ensure no two state machines run concurrently with each other
* (bad for performance, good for programmer mental health!).
*
* A "state machine" is a class with a single call method. The call method and any others it invokes are rewritten by
* a bytecode rewriting engine called Quasar, to ensure the code can be suspended and resumed at any point.
*
* The SMM will always invoke the flow fibers on the given [AffinityExecutor], regardless of which thread actually
* starts them via [add].
*
* TODO: Consider the issue of continuation identity more deeply: is it a safe assumption that a serialised
* continuation is always unique?
* TODO: Think about how to bring the system to a clean stop so it can be upgraded without any serialised stacks on disk
* TODO: Timeouts
* TODO: Surfacing of exceptions via an API and/or management UI
* TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt
* TODO: Don't store all active flows in memory, load from the database on demand.
* The StateMachineManagerImpl will always invoke the flow fibers on the given [AffinityExecutor], regardless of which
* thread actually starts them via [startFlow].
*/
@ThreadSafe
class StateMachineManager(val serviceHub: ServiceHubInternal,
val checkpointStorage: CheckpointStorage,
val executor: AffinityExecutor,
val database: CordaPersistence,
private val unfinishedFibers: ReusableLatch = ReusableLatch(),
private val classloader: ClassLoader = javaClass.classLoader) {
class StateMachineManagerImpl(
val serviceHub: ServiceHubInternal,
val checkpointStorage: CheckpointStorage,
val executor: AffinityExecutor,
val database: CordaPersistence,
private val unfinishedFibers: ReusableLatch = ReusableLatch(),
private val classloader: ClassLoader = StateMachineManagerImpl::class.java.classLoader
) : StateMachineManager {
inner class FiberScheduler : FiberExecutorScheduler("Same thread scheduler", executor)
companion object {
private val logger = loggerFor<StateMachineManager>()
private val logger = loggerFor<StateMachineManagerImpl>()
internal val sessionTopic = TopicSession("platform.session")
init {
@ -93,22 +76,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
sealed class Change {
abstract val logic: FlowLogic<*>
data class Add(override val logic: FlowLogic<*>) : Change()
data class Removed(override val logic: FlowLogic<*>, val result: Try<*>) : Change()
}
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
// property.
private class InnerState {
var started = false
val stateMachines = LinkedHashMap<FlowStateMachineImpl<*>, Checkpoint>()
val changesPublisher = PublishSubject.create<Change>()!!
val changesPublisher = PublishSubject.create<StateMachineManager.Change>()!!
val fibersWaitingForLedgerCommit = HashMultimap.create<SecureHash, FlowStateMachineImpl<*>>()!!
fun notifyChangeObservers(change: Change) {
fun notifyChangeObservers(change: StateMachineManager.Change) {
changesPublisher.bufferUntilDatabaseCommit().onNext(change)
}
}
@ -139,24 +115,22 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private val openSessions = ConcurrentHashMap<Long, FlowSessionInternal>()
private val recentlyClosedSessions = ConcurrentHashMap<Long, Party>()
internal val tokenizableServices = ArrayList<Any>()
// Context for tokenized services in checkpoints
private lateinit var tokenizableServices: List<Any>
private val serializationContext by lazy {
SerializeAsTokenContextImpl(tokenizableServices, SERIALIZATION_FACTORY, CHECKPOINT_CONTEXT, serviceHub)
}
fun findServices(predicate: (Any) -> Boolean) = tokenizableServices.filter(predicate)
/** Returns a list of all state machines executing the given flow logic at the top level (subflows do not count) */
fun <P : FlowLogic<T>, T> findStateMachines(flowClass: Class<P>): List<Pair<P, CordaFuture<T>>> {
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
return mutex.locked {
stateMachines.keys.mapNotNull {
flowClass.castIfPossible(it.logic)?.let { it to uncheckedCast<FlowStateMachine<*>, FlowStateMachineImpl<T>>(it.stateMachine).resultFuture }
flowClass.castIfPossible(it.logic)?.let { it to uncheckedCast<FlowStateMachine<*>, FlowStateMachineImpl<*>>(it.stateMachine).resultFuture }
}
}
}
val allStateMachines: List<FlowLogic<*>>
override val allStateMachines: List<FlowLogic<*>>
get() = mutex.locked { stateMachines.keys.map { it.logic } }
/**
@ -165,9 +139,10 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
*
* We use assignment here so that multiple subscribers share the same wrapped Observable.
*/
val changes: Observable<Change> = mutex.content.changesPublisher.wrapWithDatabaseTransaction()
override val changes: Observable<StateMachineManager.Change> = mutex.content.changesPublisher.wrapWithDatabaseTransaction()
fun start() {
override fun start(tokenizableServices: List<Any>) {
this.tokenizableServices = tokenizableServices
checkQuasarJavaAgentPresence()
restoreFibersFromCheckpoints()
listenToLedgerTransactions()
@ -207,12 +182,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
/**
* Start the shutdown process, bringing the [StateMachineManager] to a controlled stop. When this method returns,
* Start the shutdown process, bringing the [StateMachineManagerImpl] to a controlled stop. When this method returns,
* all Fibers have been suspended and checkpointed, or have completed.
*
* @param allowedUnsuspendedFiberCount Optional parameter is used in some tests.
*/
fun stop(allowedUnsuspendedFiberCount: Int = 0) {
override fun stop(allowedUnsuspendedFiberCount: Int) {
require(allowedUnsuspendedFiberCount >= 0)
mutex.locked {
if (stopping) throw IllegalStateException("Already stopping!")
@ -229,9 +204,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
* Atomic get snapshot + subscribe. This is needed so we don't miss updates between subscriptions to [changes] and
* calls to [allStateMachines]
*/
fun track(): DataFeed<List<FlowStateMachineImpl<*>>, Change> {
override fun track(): DataFeed<List<FlowLogic<*>>, StateMachineManager.Change> {
return mutex.locked {
DataFeed(stateMachines.keys.toList(), changesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
DataFeed(stateMachines.keys.map { it.logic }, changesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
}
}
@ -460,7 +435,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
try {
mutex.locked {
stateMachines.remove(fiber)?.let { checkpointStorage.removeCheckpoint(it) }
notifyChangeObservers(Change.Removed(fiber.logic, result))
notifyChangeObservers(StateMachineManager.Change.Removed(fiber.logic, result))
}
endAllFiberSessions(fiber, result, propagated)
} finally {
@ -473,7 +448,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
mutex.locked {
totalStartedFlows.inc()
unfinishedFibers.countUp()
notifyChangeObservers(Change.Add(fiber.logic))
notifyChangeObservers(StateMachineManager.Change.Add(fiber.logic))
}
}
@ -526,11 +501,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
*
* Note that you must be on the [executor] thread.
*/
fun <T> add(logic: FlowLogic<T>, flowInitiator: FlowInitiator, ourIdentity: Party? = null): FlowStateMachineImpl<T> {
override fun <A> startFlow(flowLogic: FlowLogic<A>, flowInitiator: FlowInitiator, ourIdentity: Party?): CordaFuture<FlowStateMachine<A>> {
// TODO: Check that logic has @Suspendable on its call method.
executor.checkOnThread()
val fiber = database.transaction {
val fiber = createFiber(logic, flowInitiator, ourIdentity)
val fiber = createFiber(flowLogic, flowInitiator, ourIdentity)
updateCheckpoint(fiber)
fiber
}
@ -540,7 +515,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
resumeFiber(fiber)
}
}
return fiber
return doneFuture(fiber)
}
private fun updateCheckpoint(fiber: FlowStateMachineImpl<*>) {

View File

@ -20,6 +20,7 @@ import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
@ -234,7 +235,7 @@ object InteractiveShell {
val clazz: Class<FlowLogic<*>> = uncheckedCast(matches.single())
try {
// TODO Flow invocation should use startFlowDynamic.
val fsm = runFlowFromString({ node.services.startFlow(it, FlowInitiator.Shell) }, inputData, clazz)
val fsm = runFlowFromString({ node.services.startFlow(it, FlowInitiator.Shell).getOrThrow() }, inputData, clazz)
// Show the progress tracker on the console until the flow completes or is interrupted with a
// Ctrl-C keypress.
val latch = CountDownLatch(1)

View File

@ -52,7 +52,7 @@ class InteractiveShellTest {
private fun check(input: String, expected: String) {
var output: DummyFSM? = null
InteractiveShell.runFlowFromString({ DummyFSM(it as FlowA).apply { output = this } }, input, FlowA::class.java, om)
assertEquals(expected, output!!.flowA.a, input)
assertEquals(expected, output!!.logic.a, input)
}
@Test
@ -83,5 +83,5 @@ class InteractiveShellTest {
@Test
fun party() = check("party: \"${MEGA_CORP.name}\"", MEGA_CORP.name.toString())
class DummyFSM(val flowA: FlowA) : FlowStateMachine<Any?> by rigorousMock()
class DummyFSM(override val logic: FlowA) : FlowStateMachine<Any?> by rigorousMock()
}

View File

@ -27,6 +27,7 @@ import net.corda.node.services.network.NetworkMapCacheImpl
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.statemachine.StateMachineManagerImpl
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.CordaPersistence
@ -113,14 +114,14 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
doReturn(this@NodeSchedulerServiceTest).whenever(it).testReference
}
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
mockSMM = StateMachineManager(services, DBCheckpointStorage(), smmExecutor, database)
mockSMM = StateMachineManagerImpl(services, DBCheckpointStorage(), smmExecutor, database)
scheduler = NodeSchedulerService(testClock, database, FlowStarterImpl(smmExecutor, mockSMM), stateLoader, schedulerGatedExecutor, serverThread = smmExecutor)
mockSMM.changes.subscribe { change ->
if (change is StateMachineManager.Change.Removed && mockSMM.allStateMachines.isEmpty()) {
smmHasRemovedAllFlows.countDown()
}
}
mockSMM.start()
mockSMM.start(emptyList())
scheduler.start()
}
}

View File

@ -64,6 +64,10 @@ class FlowFrameworkTests {
private lateinit var alice: Party
private lateinit var bob: Party
private fun StartedNode<*>.flushSmm() {
(this.smm as StateMachineManagerImpl).executor.flush()
}
@Before
fun start() {
mockNet = MockNetwork(servicePeerAllocationStrategy = RoundRobin(), cordappPackages = listOf("net.corda.finance.contracts", "net.corda.testing.contracts"))
@ -166,7 +170,7 @@ class FlowFrameworkTests {
val restoredFlow = charlieNode.getSingleFlow<NoOpFlow>().first
assertEquals(false, restoredFlow.flowStarted) // Not started yet as no network activity has been allowed yet
mockNet.runNetwork() // Allow network map messages to flow
charlieNode.smm.executor.flush()
charlieNode.flushSmm()
assertEquals(true, restoredFlow.flowStarted) // Now we should have run the flow and hopefully cleared the init checkpoint
charlieNode.internals.disableDBCloseOnStop()
charlieNode.services.networkMapCache.clearNetworkMapCache() // zap persisted NetworkMapCache to force use of network.
@ -175,7 +179,7 @@ class FlowFrameworkTests {
// Now it is completed the flow should leave no Checkpoint.
charlieNode = mockNet.createNode(charlieNode.internals.id)
mockNet.runNetwork() // Allow network map messages to flow
charlieNode.smm.executor.flush()
charlieNode.flushSmm()
assertTrue(charlieNode.smm.findStateMachines(NoOpFlow::class.java).isEmpty())
}
@ -184,7 +188,7 @@ class FlowFrameworkTests {
aliceNode.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
bobNode.services.startFlow(ReceiveFlow(alice).nonTerminating()) // Prepare checkpointed receive flow
// Make sure the add() has finished initial processing.
bobNode.smm.executor.flush()
bobNode.flushSmm()
bobNode.internals.disableDBCloseOnStop()
bobNode.dispose() // kill receiver
val restoredFlow = bobNode.restartAndGetRestoredFlow<ReceiveFlow>()
@ -210,7 +214,7 @@ class FlowFrameworkTests {
assertEquals(1, bobNode.checkpointStorage.checkpoints().size)
}
// Make sure the add() has finished initial processing.
bobNode.smm.executor.flush()
bobNode.flushSmm()
bobNode.internals.disableDBCloseOnStop()
// Restart node and thus reload the checkpoint and resend the message with same UUID
bobNode.dispose()
@ -223,7 +227,7 @@ class FlowFrameworkTests {
val (firstAgain, fut1) = node2b.getSingleFlow<PingPongFlow>()
// Run the network which will also fire up the second flow. First message should get deduped. So message data stays in sync.
mockNet.runNetwork()
node2b.smm.executor.flush()
node2b.flushSmm()
fut1.getOrThrow()
val receivedCount = receivedSessionMessages.count { it.isPayloadTransfer }
@ -731,7 +735,7 @@ class FlowFrameworkTests {
private fun StartedNode<*>.sendSessionMessage(message: SessionMessage, destination: Party) {
services.networkService.apply {
val address = getAddressOfParty(PartyInfo.SingleNode(destination, emptyList()))
send(createMessage(StateMachineManager.sessionTopic, message.serialize().bytes), address)
send(createMessage(StateMachineManagerImpl.sessionTopic, message.serialize().bytes), address)
}
}
@ -755,7 +759,7 @@ class FlowFrameworkTests {
}
private fun Observable<MessageTransfer>.toSessionTransfers(): Observable<SessionTransfer> {
return filter { it.message.topicSession == StateMachineManager.sessionTopic }.map {
return filter { it.message.topicSession == StateMachineManagerImpl.sessionTopic }.map {
val from = it.sender.id
val message = it.message.data.deserialize<SessionMessage>()
SessionTransfer(from, sanitise(message), it.recipients)