Add information on why state machine was removed from StateMachineManager (#570)

* Add information on why state machine was removed from StateMachineManager.
There are two cases: normal end of flow or error.

Return flow result as part of state machine remove data.

Make Change a sealed class with Add and Remove.

fiber.actionOnEnd takes ErrorOr<R> parameter.

* Remove unnecessary fields from StateMachineManager.Change.
This commit is contained in:
Katarzyna Streich 2017-04-25 14:34:45 +01:00 committed by GitHub
parent b597f05bd4
commit f92949d3b5
10 changed files with 39 additions and 37 deletions

View File

@ -1,6 +1,7 @@
package net.corda.core.messaging package net.corda.core.messaging
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.ErrorOr
import net.corda.core.contracts.Amount import net.corda.core.contracts.Amount
import net.corda.core.contracts.ContractState import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateAndRef
@ -41,7 +42,7 @@ sealed class StateMachineUpdate {
override val id: StateMachineRunId get() = stateMachineInfo.id override val id: StateMachineRunId get() = stateMachineInfo.id
} }
data class Removed(override val id: StateMachineRunId) : StateMachineUpdate() data class Removed(override val id: StateMachineRunId, val result: ErrorOr<*>) : StateMachineUpdate()
} }
/** /**

View File

@ -18,7 +18,6 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.requirePermission import net.corda.node.services.messaging.requirePermission
import net.corda.node.services.startFlowPermission import net.corda.node.services.startFlowPermission
import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.transaction import net.corda.node.utilities.transaction
import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.X500Name
import net.corda.nodeapi.CURRENT_RPC_USER import net.corda.nodeapi.CURRENT_RPC_USER
@ -63,7 +62,7 @@ class CordaRPCOpsImpl(
return database.transaction { return database.transaction {
val (allStateMachines, changes) = smm.track() val (allStateMachines, changes) = smm.track()
Pair( Pair(
allStateMachines.map { stateMachineInfoFromFlowLogic(it.id, it.logic, it.flowInitiator) }, allStateMachines.map { stateMachineInfoFromFlowLogic(it.logic) },
changes.map { stateMachineUpdateFromStateMachineChange(it) } changes.map { stateMachineUpdateFromStateMachineChange(it) }
) )
} }
@ -151,14 +150,14 @@ class CordaRPCOpsImpl(
override fun registeredFlows(): List<String> = services.flowLogicRefFactory.flowWhitelist.keys.sorted() override fun registeredFlows(): List<String> = services.flowLogicRefFactory.flowWhitelist.keys.sorted()
companion object { companion object {
private fun stateMachineInfoFromFlowLogic(id: StateMachineRunId, flowLogic: FlowLogic<*>, flowInitiator: FlowInitiator): StateMachineInfo { private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {
return StateMachineInfo(id, flowLogic.javaClass.name, flowInitiator, flowLogic.track()) return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.flowInitiator, flowLogic.track())
} }
private fun stateMachineUpdateFromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate { private fun stateMachineUpdateFromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate {
return when (change.addOrRemove) { return when (change) {
AddOrRemove.ADD -> StateMachineUpdate.Added(stateMachineInfoFromFlowLogic(change.id, change.logic, change.flowInitiator)) is StateMachineManager.Change.Add -> StateMachineUpdate.Added(stateMachineInfoFromFlowLogic(change.logic))
AddOrRemove.REMOVE -> StateMachineUpdate.Removed(change.id) is StateMachineManager.Change.Removed -> StateMachineUpdate.Removed(change.logic.runId, change.result)
} }
} }
} }

View File

@ -7,6 +7,7 @@ import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import net.corda.client.rpc.notUsed import net.corda.client.rpc.notUsed
import net.corda.core.ErrorOr
import net.corda.core.abbreviate import net.corda.core.abbreviate
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
@ -77,7 +78,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Transient override lateinit var serviceHub: ServiceHubInternal @Transient override lateinit var serviceHub: ServiceHubInternal
@Transient internal lateinit var database: Database @Transient internal lateinit var database: Database
@Transient internal lateinit var actionOnSuspend: (FlowIORequest) -> Unit @Transient internal lateinit var actionOnSuspend: (FlowIORequest) -> Unit
@Transient internal lateinit var actionOnEnd: (Throwable?, Boolean) -> Unit @Transient internal lateinit var actionOnEnd: (ErrorOr<R>, Boolean) -> Unit
@Transient internal var fromCheckpoint: Boolean = false @Transient internal var fromCheckpoint: Boolean = false
@Transient private var txTrampoline: Transaction? = null @Transient private var txTrampoline: Transaction? = null
@ -145,7 +146,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
.filter { it.state is FlowSessionState.Initiating } .filter { it.state is FlowSessionState.Initiating }
.forEach { it.waitForConfirmation() } .forEach { it.waitForConfirmation() }
// This is to prevent actionOnEnd being called twice if it throws an exception // This is to prevent actionOnEnd being called twice if it throws an exception
actionOnEnd(null, false) actionOnEnd(ErrorOr(result), false)
_resultFuture?.set(result) _resultFuture?.set(result)
logic.progressTracker?.currentStep = ProgressTracker.DONE logic.progressTracker?.currentStep = ProgressTracker.DONE
logger.debug { "Flow finished with result $result" } logger.debug { "Flow finished with result $result" }
@ -158,7 +159,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
} }
private fun processException(exception: Throwable, propagated: Boolean) { private fun processException(exception: Throwable, propagated: Boolean) {
actionOnEnd(exception, propagated) actionOnEnd(ErrorOr.of(exception), propagated)
_resultFuture?.setException(exception) _resultFuture?.setException(exception)
logic.progressTracker?.endWithError(exception) logic.progressTracker?.endWithError(exception)
} }

View File

@ -13,6 +13,7 @@ import com.esotericsoftware.kryo.pool.KryoPool
import com.google.common.collect.HashMultimap import com.google.common.collect.HashMultimap
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import io.requery.util.CloseableIterator import io.requery.util.CloseableIterator
import net.corda.core.ErrorOr
import net.corda.core.ThreadBox import net.corda.core.ThreadBox
import net.corda.core.bufferUntilSubscribed import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
@ -111,12 +112,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
val scheduler = FiberScheduler() val scheduler = FiberScheduler()
data class Change( sealed class Change {
val logic: FlowLogic<*>, abstract val logic: FlowLogic<*>
val addOrRemove: AddOrRemove,
val id: StateMachineRunId, data class Add(override val logic: FlowLogic<*>): Change()
val flowInitiator: FlowInitiator data class Removed(override val logic: FlowLogic<*>, val result: ErrorOr<*>): Change()
) }
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines // A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
// property. // property.
@ -126,8 +127,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
val changesPublisher = PublishSubject.create<Change>()!! val changesPublisher = PublishSubject.create<Change>()!!
val fibersWaitingForLedgerCommit = HashMultimap.create<SecureHash, FlowStateMachineImpl<*>>()!! val fibersWaitingForLedgerCommit = HashMultimap.create<SecureHash, FlowStateMachineImpl<*>>()!!
fun notifyChangeObservers(fiber: FlowStateMachineImpl<*>, addOrRemove: AddOrRemove) { fun notifyChangeObservers(change: Change) {
changesPublisher.bufferUntilDatabaseCommit().onNext(Change(fiber.logic, addOrRemove, fiber.id, fiber.flowInitiator)) changesPublisher.bufferUntilDatabaseCommit().onNext(change)
} }
} }
@ -416,13 +417,13 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
processIORequest(ioRequest) processIORequest(ioRequest)
decrementLiveFibers() decrementLiveFibers()
} }
fiber.actionOnEnd = { exception, propagated -> fiber.actionOnEnd = { resultOrError, propagated ->
try { try {
mutex.locked { mutex.locked {
stateMachines.remove(fiber)?.let { checkpointStorage.removeCheckpoint(it) } stateMachines.remove(fiber)?.let { checkpointStorage.removeCheckpoint(it) }
notifyChangeObservers(fiber, AddOrRemove.REMOVE) notifyChangeObservers(Change.Removed(fiber.logic, resultOrError))
} }
endAllFiberSessions(fiber, exception, propagated) endAllFiberSessions(fiber, resultOrError.error, propagated)
} finally { } finally {
fiber.commitTransaction() fiber.commitTransaction()
decrementLiveFibers() decrementLiveFibers()
@ -433,7 +434,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
mutex.locked { mutex.locked {
totalStartedFlows.inc() totalStartedFlows.inc()
unfinishedFibers.countUp() unfinishedFibers.countUp()
notifyChangeObservers(fiber, AddOrRemove.ADD) notifyChangeObservers(Change.Add(fiber.logic))
} }
} }

View File

@ -8,7 +8,6 @@ import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace import net.corda.core.utilities.trace
import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.AddOrRemove
import java.util.* import java.util.*
class VaultSoftLockManager(val vault: VaultService, smm: StateMachineManager) { class VaultSoftLockManager(val vault: VaultService, smm: StateMachineManager) {
@ -18,10 +17,10 @@ class VaultSoftLockManager(val vault: VaultService, smm: StateMachineManager) {
} }
init { init {
smm.changes.subscribe { (logic, addOrRemove, id) -> smm.changes.subscribe { change ->
if (addOrRemove == AddOrRemove.REMOVE && (FlowStateMachineImpl.currentStateMachine())?.hasSoftLockedStates == true) { if (change is StateMachineManager.Change.Removed && (FlowStateMachineImpl.currentStateMachine())?.hasSoftLockedStates == true) {
log.trace { "$addOrRemove Flow name ${logic.javaClass} with id $id" } log.trace { "Remove flow name ${change.logic.javaClass} with id $change.id" }
unregisterSoftLocks(id, logic) unregisterSoftLocks(change.logic.runId, change.logic)
} }
} }

View File

@ -1,5 +1,6 @@
package net.corda.node.utilities package net.corda.node.utilities
import net.corda.core.ErrorOr
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
/** /**

View File

@ -14,7 +14,6 @@ import net.corda.node.services.MockServiceHubInternal
import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.configureDatabase
import net.corda.node.utilities.transaction import net.corda.node.utilities.transaction
@ -89,7 +88,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1) smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
val mockSMM = StateMachineManager(services, listOf(services, scheduler), DBCheckpointStorage(), smmExecutor, database) val mockSMM = StateMachineManager(services, listOf(services, scheduler), DBCheckpointStorage(), smmExecutor, database)
mockSMM.changes.subscribe { change -> mockSMM.changes.subscribe { change ->
if (change.addOrRemove == AddOrRemove.REMOVE && mockSMM.allStateMachines.isEmpty()) { if (change is StateMachineManager.Change.Removed && mockSMM.allStateMachines.isEmpty()) {
smmHasRemovedAllFlows.countDown() smmHasRemovedAllFlows.countDown()
} }
} }

View File

@ -13,6 +13,7 @@ import net.corda.core.node.services.linearHeadsOfType
import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.flows.FinalityFlow import net.corda.flows.FinalityFlow
import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.ValidatingNotaryService import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.node.utilities.AddOrRemove import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.transaction import net.corda.node.utilities.transaction
@ -117,8 +118,8 @@ class ScheduledFlowTests {
val stateMachines = nodeA.smm.track() val stateMachines = nodeA.smm.track()
var countScheduledFlows = 0 var countScheduledFlows = 0
stateMachines.second.subscribe { stateMachines.second.subscribe {
if (it.addOrRemove == AddOrRemove.ADD) { if (it is StateMachineManager.Change.Add) {
val initiator = it.flowInitiator val initiator = it.logic.stateMachine.flowInitiator
if (initiator is FlowInitiator.Scheduled) if (initiator is FlowInitiator.Scheduled)
countScheduledFlows++ countScheduledFlows++
} }

View File

@ -17,8 +17,8 @@ import net.corda.core.utilities.ProgressTracker
import net.corda.irs.api.NodeInterestRates import net.corda.irs.api.NodeInterestRates
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.SimpleNotaryService import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.transaction import net.corda.node.utilities.transaction
import net.corda.testing.TestNodeConfiguration import net.corda.testing.TestNodeConfiguration
import net.corda.testing.node.InMemoryMessagingNetwork import net.corda.testing.node.InMemoryMessagingNetwork
@ -240,7 +240,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
protected fun showProgressFor(nodes: List<SimulatedNode>) { protected fun showProgressFor(nodes: List<SimulatedNode>) {
nodes.forEach { node -> nodes.forEach { node ->
node.smm.changes.filter { it.addOrRemove == AddOrRemove.ADD }.subscribe { node.smm.changes.filter { it is StateMachineManager.Change.Add }.subscribe {
linkFlowProgress(node, it.logic) linkFlowProgress(node, it.logic)
} }
} }
@ -257,7 +257,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
protected fun showConsensusFor(nodes: List<SimulatedNode>) { protected fun showConsensusFor(nodes: List<SimulatedNode>) {
val node = nodes.first() val node = nodes.first()
node.smm.changes.filter { it.addOrRemove == net.corda.node.utilities.AddOrRemove.ADD }.first().subscribe { node.smm.changes.filter { it is StateMachineManager.Change.Add }.first().subscribe {
linkConsensus(nodes, it.logic) linkConsensus(nodes, it.logic)
} }
} }

View File

@ -20,7 +20,7 @@ import net.corda.node.internal.AbstractNode
import net.corda.node.internal.NetworkMapInfo import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.config.* import net.corda.node.services.config.*
import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.utilities.AddOrRemove.ADD import net.corda.node.services.statemachine.StateMachineManager
import net.corda.nodeapi.User import net.corda.nodeapi.User
import net.corda.nodeapi.config.SSLConfiguration import net.corda.nodeapi.config.SSLConfiguration
import net.corda.testing.node.MockIdentityService import net.corda.testing.node.MockIdentityService
@ -146,7 +146,7 @@ fun getFreeLocalPorts(hostName: String, numberToAlloc: Int): List<HostAndPort> {
inline fun <reified P : FlowLogic<*>> AbstractNode.initiateSingleShotFlow( inline fun <reified P : FlowLogic<*>> AbstractNode.initiateSingleShotFlow(
markerClass: KClass<out FlowLogic<*>>, markerClass: KClass<out FlowLogic<*>>,
noinline flowFactory: (Party) -> P): ListenableFuture<P> { noinline flowFactory: (Party) -> P): ListenableFuture<P> {
val future = smm.changes.filter { it.addOrRemove == ADD && it.logic is P }.map { it.logic as P }.toFuture() val future = smm.changes.filter { it is StateMachineManager.Change.Add && it.logic is P }.map { it.logic as P }.toFuture()
services.registerFlowInitiator(markerClass.java, flowFactory) services.registerFlowInitiator(markerClass.java, flowFactory)
return future return future
} }