diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index d75cc26e20..4381d17de3 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -1,6 +1,7 @@ package net.corda.core.messaging import com.google.common.util.concurrent.ListenableFuture +import net.corda.core.ErrorOr import net.corda.core.contracts.Amount import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateAndRef @@ -41,7 +42,7 @@ sealed class StateMachineUpdate { override val id: StateMachineRunId get() = stateMachineInfo.id } - data class Removed(override val id: StateMachineRunId) : StateMachineUpdate() + data class Removed(override val id: StateMachineRunId, val result: ErrorOr<*>) : StateMachineUpdate() } /** diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index c0e567c6f8..80f9c8fe49 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -18,7 +18,6 @@ import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.messaging.requirePermission import net.corda.node.services.startFlowPermission import net.corda.node.services.statemachine.StateMachineManager -import net.corda.node.utilities.AddOrRemove import net.corda.node.utilities.transaction import org.bouncycastle.asn1.x500.X500Name import net.corda.nodeapi.CURRENT_RPC_USER @@ -63,7 +62,7 @@ class CordaRPCOpsImpl( return database.transaction { val (allStateMachines, changes) = smm.track() Pair( - allStateMachines.map { stateMachineInfoFromFlowLogic(it.id, it.logic, it.flowInitiator) }, + allStateMachines.map { stateMachineInfoFromFlowLogic(it.logic) }, changes.map { stateMachineUpdateFromStateMachineChange(it) } ) } @@ -151,14 +150,14 @@ class CordaRPCOpsImpl( override fun registeredFlows(): List = services.flowLogicRefFactory.flowWhitelist.keys.sorted() companion object { - private fun stateMachineInfoFromFlowLogic(id: StateMachineRunId, flowLogic: FlowLogic<*>, flowInitiator: FlowInitiator): StateMachineInfo { - return StateMachineInfo(id, flowLogic.javaClass.name, flowInitiator, flowLogic.track()) + private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo { + return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.flowInitiator, flowLogic.track()) } private fun stateMachineUpdateFromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate { - return when (change.addOrRemove) { - AddOrRemove.ADD -> StateMachineUpdate.Added(stateMachineInfoFromFlowLogic(change.id, change.logic, change.flowInitiator)) - AddOrRemove.REMOVE -> StateMachineUpdate.Removed(change.id) + return when (change) { + is StateMachineManager.Change.Add -> StateMachineUpdate.Added(stateMachineInfoFromFlowLogic(change.logic)) + is StateMachineManager.Change.Removed -> StateMachineUpdate.Removed(change.logic.runId, change.result) } } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 00a89df830..9089d3ac72 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -7,6 +7,7 @@ import co.paralleluniverse.strands.Strand import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.SettableFuture import net.corda.client.rpc.notUsed +import net.corda.core.ErrorOr import net.corda.core.abbreviate import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash @@ -77,7 +78,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, @Transient override lateinit var serviceHub: ServiceHubInternal @Transient internal lateinit var database: Database @Transient internal lateinit var actionOnSuspend: (FlowIORequest) -> Unit - @Transient internal lateinit var actionOnEnd: (Throwable?, Boolean) -> Unit + @Transient internal lateinit var actionOnEnd: (ErrorOr, Boolean) -> Unit @Transient internal var fromCheckpoint: Boolean = false @Transient private var txTrampoline: Transaction? = null @@ -145,7 +146,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, .filter { it.state is FlowSessionState.Initiating } .forEach { it.waitForConfirmation() } // This is to prevent actionOnEnd being called twice if it throws an exception - actionOnEnd(null, false) + actionOnEnd(ErrorOr(result), false) _resultFuture?.set(result) logic.progressTracker?.currentStep = ProgressTracker.DONE logger.debug { "Flow finished with result $result" } @@ -158,7 +159,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } private fun processException(exception: Throwable, propagated: Boolean) { - actionOnEnd(exception, propagated) + actionOnEnd(ErrorOr.of(exception), propagated) _resultFuture?.setException(exception) logic.progressTracker?.endWithError(exception) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index d3303a7e65..811af28e35 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -13,6 +13,7 @@ import com.esotericsoftware.kryo.pool.KryoPool import com.google.common.collect.HashMultimap import com.google.common.util.concurrent.ListenableFuture import io.requery.util.CloseableIterator +import net.corda.core.ErrorOr import net.corda.core.ThreadBox import net.corda.core.bufferUntilSubscribed import net.corda.core.crypto.Party @@ -111,12 +112,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val scheduler = FiberScheduler() - data class Change( - val logic: FlowLogic<*>, - val addOrRemove: AddOrRemove, - val id: StateMachineRunId, - val flowInitiator: FlowInitiator - ) + sealed class Change { + abstract val logic: FlowLogic<*> + + data class Add(override val logic: FlowLogic<*>): Change() + data class Removed(override val logic: FlowLogic<*>, val result: ErrorOr<*>): Change() + } // A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines // property. @@ -126,8 +127,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val changesPublisher = PublishSubject.create()!! val fibersWaitingForLedgerCommit = HashMultimap.create>()!! - fun notifyChangeObservers(fiber: FlowStateMachineImpl<*>, addOrRemove: AddOrRemove) { - changesPublisher.bufferUntilDatabaseCommit().onNext(Change(fiber.logic, addOrRemove, fiber.id, fiber.flowInitiator)) + fun notifyChangeObservers(change: Change) { + changesPublisher.bufferUntilDatabaseCommit().onNext(change) } } @@ -416,13 +417,13 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, processIORequest(ioRequest) decrementLiveFibers() } - fiber.actionOnEnd = { exception, propagated -> + fiber.actionOnEnd = { resultOrError, propagated -> try { mutex.locked { stateMachines.remove(fiber)?.let { checkpointStorage.removeCheckpoint(it) } - notifyChangeObservers(fiber, AddOrRemove.REMOVE) + notifyChangeObservers(Change.Removed(fiber.logic, resultOrError)) } - endAllFiberSessions(fiber, exception, propagated) + endAllFiberSessions(fiber, resultOrError.error, propagated) } finally { fiber.commitTransaction() decrementLiveFibers() @@ -433,7 +434,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, mutex.locked { totalStartedFlows.inc() unfinishedFibers.countUp() - notifyChangeObservers(fiber, AddOrRemove.ADD) + notifyChangeObservers(Change.Add(fiber.logic)) } } diff --git a/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt b/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt index 80920f068a..475c4f900a 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt @@ -8,7 +8,6 @@ import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.services.statemachine.StateMachineManager -import net.corda.node.utilities.AddOrRemove import java.util.* class VaultSoftLockManager(val vault: VaultService, smm: StateMachineManager) { @@ -18,10 +17,10 @@ class VaultSoftLockManager(val vault: VaultService, smm: StateMachineManager) { } init { - smm.changes.subscribe { (logic, addOrRemove, id) -> - if (addOrRemove == AddOrRemove.REMOVE && (FlowStateMachineImpl.currentStateMachine())?.hasSoftLockedStates == true) { - log.trace { "$addOrRemove Flow name ${logic.javaClass} with id $id" } - unregisterSoftLocks(id, logic) + smm.changes.subscribe { change -> + if (change is StateMachineManager.Change.Removed && (FlowStateMachineImpl.currentStateMachine())?.hasSoftLockedStates == true) { + log.trace { "Remove flow name ${change.logic.javaClass} with id $change.id" } + unregisterSoftLocks(change.logic.runId, change.logic) } } diff --git a/node/src/main/kotlin/net/corda/node/utilities/AddOrRemove.kt b/node/src/main/kotlin/net/corda/node/utilities/AddOrRemove.kt index 155e7853d2..0cf5538a95 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/AddOrRemove.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/AddOrRemove.kt @@ -1,5 +1,6 @@ package net.corda.node.utilities +import net.corda.core.ErrorOr import net.corda.core.serialization.CordaSerializable /** diff --git a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt index 4db7721bd7..746ea9822a 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt @@ -14,7 +14,6 @@ import net.corda.node.services.MockServiceHubInternal import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.vault.NodeVaultService -import net.corda.node.utilities.AddOrRemove import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.transaction @@ -89,7 +88,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() { smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1) val mockSMM = StateMachineManager(services, listOf(services, scheduler), DBCheckpointStorage(), smmExecutor, database) mockSMM.changes.subscribe { change -> - if (change.addOrRemove == AddOrRemove.REMOVE && mockSMM.allStateMachines.isEmpty()) { + if (change is StateMachineManager.Change.Removed && mockSMM.allStateMachines.isEmpty()) { smmHasRemovedAllFlows.countDown() } } diff --git a/node/src/test/kotlin/net/corda/node/services/events/ScheduledFlowTests.kt b/node/src/test/kotlin/net/corda/node/services/events/ScheduledFlowTests.kt index 1a40b04666..86bfa72216 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/ScheduledFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/ScheduledFlowTests.kt @@ -13,6 +13,7 @@ import net.corda.core.node.services.linearHeadsOfType import net.corda.core.utilities.DUMMY_NOTARY import net.corda.flows.FinalityFlow import net.corda.node.services.network.NetworkMapService +import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.transactions.ValidatingNotaryService import net.corda.node.utilities.AddOrRemove import net.corda.node.utilities.transaction @@ -117,8 +118,8 @@ class ScheduledFlowTests { val stateMachines = nodeA.smm.track() var countScheduledFlows = 0 stateMachines.second.subscribe { - if (it.addOrRemove == AddOrRemove.ADD) { - val initiator = it.flowInitiator + if (it is StateMachineManager.Change.Add) { + val initiator = it.logic.stateMachine.flowInitiator if (initiator is FlowInitiator.Scheduled) countScheduledFlows++ } diff --git a/samples/irs-demo/src/main/kotlin/net/corda/simulation/Simulation.kt b/samples/irs-demo/src/main/kotlin/net/corda/simulation/Simulation.kt index 694c9f117a..97dd1efb21 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/simulation/Simulation.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/simulation/Simulation.kt @@ -17,8 +17,8 @@ import net.corda.core.utilities.ProgressTracker import net.corda.irs.api.NodeInterestRates import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.network.NetworkMapService +import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.transactions.SimpleNotaryService -import net.corda.node.utilities.AddOrRemove import net.corda.node.utilities.transaction import net.corda.testing.TestNodeConfiguration import net.corda.testing.node.InMemoryMessagingNetwork @@ -240,7 +240,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, protected fun showProgressFor(nodes: List) { nodes.forEach { node -> - node.smm.changes.filter { it.addOrRemove == AddOrRemove.ADD }.subscribe { + node.smm.changes.filter { it is StateMachineManager.Change.Add }.subscribe { linkFlowProgress(node, it.logic) } } @@ -257,7 +257,7 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean, protected fun showConsensusFor(nodes: List) { val node = nodes.first() - node.smm.changes.filter { it.addOrRemove == net.corda.node.utilities.AddOrRemove.ADD }.first().subscribe { + node.smm.changes.filter { it is StateMachineManager.Change.Add }.first().subscribe { linkConsensus(nodes, it.logic) } } diff --git a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt index 5ceae8f7d6..28ca2a8637 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt @@ -20,7 +20,7 @@ import net.corda.node.internal.AbstractNode import net.corda.node.internal.NetworkMapInfo import net.corda.node.services.config.* import net.corda.node.services.statemachine.FlowStateMachineImpl -import net.corda.node.utilities.AddOrRemove.ADD +import net.corda.node.services.statemachine.StateMachineManager import net.corda.nodeapi.User import net.corda.nodeapi.config.SSLConfiguration import net.corda.testing.node.MockIdentityService @@ -146,7 +146,7 @@ fun getFreeLocalPorts(hostName: String, numberToAlloc: Int): List { inline fun > AbstractNode.initiateSingleShotFlow( markerClass: KClass>, noinline flowFactory: (Party) -> P): ListenableFuture

{ - val future = smm.changes.filter { it.addOrRemove == ADD && it.logic is P }.map { it.logic as P }.toFuture() + val future = smm.changes.filter { it is StateMachineManager.Change.Add && it.logic is P }.map { it.logic as P }.toFuture() services.registerFlowInitiator(markerClass.java, flowFactory) return future }