From 8ac7690987577840c87ba6a9ff75e328526596ea Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Mon, 11 Jun 2018 11:24:12 +0100 Subject: [PATCH] CORDA-1599 killFlow RPC call does not remove records from the Flow Hospital, if there are any. (#3332) * CORDA-1599 Add a mechanism so killFlow can clean up the in memory data structures held by the Flow Hospital and other transition executor implementations. * Fix bug and test * Fix bug and test --- .../SingleThreadedStateMachineManager.kt | 25 ++++++- .../statemachine/StaffedFlowHospital.kt | 4 +- .../statemachine/TransitionExecutor.kt | 8 ++ .../statemachine/TransitionExecutorImpl.kt | 3 + .../DumpHistoryOnErrorInterceptor.kt | 12 ++- ...FiberDeserializationCheckingInterceptor.kt | 13 +++- .../interceptors/HospitalisingInterceptor.kt | 21 +++++- .../interceptors/MetricInterceptor.kt | 12 ++- .../interceptors/PrintingInterceptor.kt | 11 ++- .../statemachine/RetryFlowMockTest.kt | 73 ++++++++++++++++++- 10 files changed, 165 insertions(+), 17 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index eb6a659cdc..a6dcb4eb78 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -13,12 +13,20 @@ import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party -import net.corda.core.internal.* +import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.ThreadBox +import net.corda.core.internal.TimedFlow +import net.corda.core.internal.bufferUntilSubscribed +import net.corda.core.internal.castIfPossible import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.DataFeed -import net.corda.core.serialization.* +import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.SerializedBytes +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.Try import net.corda.core.utilities.contextLogger @@ -30,7 +38,11 @@ import net.corda.node.services.config.shouldCheckCheckpoints import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.ReceivedMessage import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion -import net.corda.node.services.statemachine.interceptors.* +import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor +import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker +import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor +import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor +import net.corda.node.services.statemachine.interceptors.PrintingInterceptor import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.utilities.AffinityExecutor import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -42,7 +54,11 @@ import rx.Observable import rx.subjects.PublishSubject import java.security.SecureRandom import java.util.* -import java.util.concurrent.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import javax.annotation.concurrent.ThreadSafe import kotlin.collections.ArrayList @@ -220,6 +236,7 @@ class SingleThreadedStateMachineManager( database.transaction { checkpointStorage.removeCheckpoint(id) } + transitionExecutor.forceRemoveFlow(id) } } else { // TODO replace with a clustered delete after we'll support clustered nodes diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index a08ebada2e..e30c7f7916 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -110,8 +110,8 @@ class StaffedFlowHospital { /** * The flow has been removed from the state machine. */ - fun flowRemoved(flowFiber: FlowFiber) { - mutex.locked { patients.remove(flowFiber.id) } + fun flowRemoved(flowId: StateMachineRunId) { + mutex.locked { patients.remove(flowId) } } // TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutor.kt index 7bf29e3f14..a5749b2105 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutor.kt @@ -1,6 +1,7 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.StateMachineRunId import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult @@ -17,6 +18,13 @@ interface TransitionExecutor { transition: TransitionResult, actionExecutor: ActionExecutor ): Pair + + /** + * Called if the normal exit path where the new state is marked as removed via [StateMachineState.isRemoved] is not called. + * Currently this only happens via [StateMachineManager.killFlow]. This allows instances of this interface to clean up + * any state they are holding for a flow to prevent a memory leak. + */ + fun forceRemoveFlow(id: StateMachineRunId) } /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt index 2cf328a450..f8fe721679 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt @@ -1,6 +1,7 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.StateMachineRunId import net.corda.core.utilities.contextLogger import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult @@ -20,6 +21,8 @@ class TransitionExecutorImpl( val secureRandom: SecureRandom, val database: CordaPersistence ) : TransitionExecutor { + override fun forceRemoveFlow(id: StateMachineRunId) {} + private companion object { val log = contextLogger() } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt index 2e57e0bb14..c5ef2bbca8 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/DumpHistoryOnErrorInterceptor.kt @@ -3,7 +3,12 @@ package net.corda.node.services.statemachine.interceptors import co.paralleluniverse.fibers.Suspendable import net.corda.core.flows.StateMachineRunId import net.corda.core.utilities.contextLogger -import net.corda.node.services.statemachine.* +import net.corda.node.services.statemachine.ActionExecutor +import net.corda.node.services.statemachine.ErrorState +import net.corda.node.services.statemachine.Event +import net.corda.node.services.statemachine.FlowFiber +import net.corda.node.services.statemachine.StateMachineState +import net.corda.node.services.statemachine.TransitionExecutor import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult import java.time.Instant @@ -48,4 +53,9 @@ class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : Transiti return Pair(continuation, nextState) } + + override fun forceRemoveFlow(id: StateMachineRunId) { + records.remove(id) + delegate.forceRemoveFlow(id) + } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt index cbde382f4d..67b1733a90 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt @@ -1,11 +1,18 @@ package net.corda.node.services.statemachine.interceptors import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.StateMachineRunId import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize import net.corda.core.utilities.contextLogger -import net.corda.node.services.statemachine.* +import net.corda.node.services.statemachine.ActionExecutor +import net.corda.node.services.statemachine.Event +import net.corda.node.services.statemachine.FlowFiber +import net.corda.node.services.statemachine.FlowState +import net.corda.node.services.statemachine.FlowStateMachineImpl +import net.corda.node.services.statemachine.StateMachineState +import net.corda.node.services.statemachine.TransitionExecutor import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult import java.util.concurrent.LinkedBlockingQueue @@ -18,6 +25,10 @@ class FiberDeserializationCheckingInterceptor( val fiberDeserializationChecker: FiberDeserializationChecker, val delegate: TransitionExecutor ) : TransitionExecutor { + override fun forceRemoveFlow(id: StateMachineRunId) { + delegate.forceRemoveFlow(id) + } + @Suspendable override fun executeTransition( fiber: FlowFiber, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt index 4a20ef9116..db6cbed48b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt @@ -2,7 +2,13 @@ package net.corda.node.services.statemachine.interceptors import co.paralleluniverse.fibers.Suspendable import net.corda.core.flows.StateMachineRunId -import net.corda.node.services.statemachine.* +import net.corda.node.services.statemachine.ActionExecutor +import net.corda.node.services.statemachine.ErrorState +import net.corda.node.services.statemachine.Event +import net.corda.node.services.statemachine.FlowFiber +import net.corda.node.services.statemachine.StaffedFlowHospital +import net.corda.node.services.statemachine.StateMachineState +import net.corda.node.services.statemachine.TransitionExecutor import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult import java.util.concurrent.ConcurrentHashMap @@ -15,6 +21,16 @@ class HospitalisingInterceptor( private val flowHospital: StaffedFlowHospital, private val delegate: TransitionExecutor ) : TransitionExecutor { + override fun forceRemoveFlow(id: StateMachineRunId) { + removeFlow(id) + delegate.forceRemoveFlow(id) + } + + private fun removeFlow(id: StateMachineRunId) { + hospitalisedFlows.remove(id) + flowHospital.flowRemoved(id) + } + private val hospitalisedFlows = ConcurrentHashMap() @Suspendable @@ -41,8 +57,7 @@ class HospitalisingInterceptor( } } if (nextState.isRemoved) { - hospitalisedFlows.remove(fiber.id) - flowHospital.flowRemoved(fiber) + removeFlow(fiber.id) } return Pair(continuation, nextState) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/MetricInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/MetricInterceptor.kt index 4f7f140673..bbd4e7472e 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/MetricInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/MetricInterceptor.kt @@ -2,11 +2,21 @@ package net.corda.node.services.statemachine.interceptors import co.paralleluniverse.fibers.Suspendable import com.codahale.metrics.MetricRegistry -import net.corda.node.services.statemachine.* +import net.corda.core.flows.StateMachineRunId +import net.corda.node.services.statemachine.Action +import net.corda.node.services.statemachine.ActionExecutor +import net.corda.node.services.statemachine.Event +import net.corda.node.services.statemachine.FlowFiber +import net.corda.node.services.statemachine.StateMachineState +import net.corda.node.services.statemachine.TransitionExecutor import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult class MetricInterceptor(val metrics: MetricRegistry, val delegate: TransitionExecutor) : TransitionExecutor { + override fun forceRemoveFlow(id: StateMachineRunId) { + delegate.forceRemoveFlow(id) + } + @Suspendable override fun executeTransition(fiber: FlowFiber, previousState: StateMachineState, event: Event, transition: TransitionResult, actionExecutor: ActionExecutor): Pair { val metricActionInterceptor = MetricActionInterceptor(metrics, actionExecutor) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/PrintingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/PrintingInterceptor.kt index a0ca6d6660..e23016e84e 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/PrintingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/PrintingInterceptor.kt @@ -1,8 +1,13 @@ package net.corda.node.services.statemachine.interceptors import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.StateMachineRunId import net.corda.core.utilities.contextLogger -import net.corda.node.services.statemachine.* +import net.corda.node.services.statemachine.ActionExecutor +import net.corda.node.services.statemachine.Event +import net.corda.node.services.statemachine.FlowFiber +import net.corda.node.services.statemachine.StateMachineState +import net.corda.node.services.statemachine.TransitionExecutor import net.corda.node.services.statemachine.transitions.FlowContinuation import net.corda.node.services.statemachine.transitions.TransitionResult import java.time.Instant @@ -11,6 +16,10 @@ import java.time.Instant * This interceptor simply prints all state machine transitions. Useful for debugging. */ class PrintingInterceptor(val delegate: TransitionExecutor) : TransitionExecutor { + override fun forceRemoveFlow(id: StateMachineRunId) { + delegate.forceRemoveFlow(id) + } + companion object { val log = contextLogger() } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index fca7e8687b..56d009eeef 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -2,17 +2,17 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable import net.corda.core.concurrent.CordaFuture -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.FlowSession -import net.corda.core.flows.InitiatedBy -import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.* import net.corda.core.identity.Party +import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.packageName import net.corda.core.messaging.MessageRecipients +import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.node.internal.StartedNode +import net.corda.node.services.FinalityHandler import net.corda.node.services.messaging.Message import net.corda.node.services.persistence.DBTransactionStorage import net.corda.nodeapi.internal.persistence.contextTransaction @@ -102,6 +102,51 @@ class RetryFlowMockTest { assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty() assertEquals(2, RetryFlow.count) } + + @Test + fun `Patient records do not leak in hospital when using killFlow`() { + val flow: FlowStateMachine = nodeA.services.startFlow(FinalityHandler(object : FlowSession() { + override val counterparty: Party + get() = TODO("not implemented") + + override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo { + TODO("not implemented") + } + + override fun getCounterpartyFlowInfo(): FlowInfo { + TODO("not implemented") + } + + override fun sendAndReceive(receiveType: Class, payload: Any, maySkipCheckpoint: Boolean): UntrustworthyData { + TODO("not implemented") + } + + override fun sendAndReceive(receiveType: Class, payload: Any): UntrustworthyData { + TODO("not implemented") + } + + override fun receive(receiveType: Class, maySkipCheckpoint: Boolean): UntrustworthyData { + TODO("not implemented") + } + + override fun receive(receiveType: Class): UntrustworthyData { + TODO("not implemented") + } + + override fun send(payload: Any, maySkipCheckpoint: Boolean) { + TODO("not implemented") + } + + override fun send(payload: Any) { + TODO("not implemented") + } + }), nodeA.services.newContext()).get() + // Make sure we have seen an update from the hospital, and thus the flow went there. + nodeA.smm.flowHospital.track().updates.toBlocking().first() + // Killing it should remove it. + nodeA.smm.killFlow(flow.id) + assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty() + } } class LimitedRetryCausingError : ConstraintViolationException("Test message", SQLException(), "Test constraint") @@ -126,6 +171,26 @@ class RetryFlow(private val i: Int) : FlowLogic() { } } +class RetryAndSleepFlow(private val i: Int) : FlowLogic() { + companion object { + var count = 0 + } + + @Suspendable + override fun call() { + logger.info("Hello $count") + if (count++ < i) { + if (i == Int.MAX_VALUE) { + throw LimitedRetryCausingError() + } else { + throw RetryCausingError() + } + } else { + sleep(Duration.ofDays(1)) + } + } +} + @InitiatingFlow class SendAndRetryFlow(private val i: Int, private val other: Party) : FlowLogic() { companion object {