CORDA-1599 killFlow RPC call does not remove records from the Flow Hospital, if there are any. (#3332)

* CORDA-1599 Add a mechanism so killFlow can clean up the in memory data structures held by the Flow Hospital and other transition executor implementations.

* Fix bug and test

* Fix bug and test
This commit is contained in:
Rick Parker 2018-06-11 11:24:12 +01:00 committed by GitHub
parent 92c499d0b8
commit 8ac7690987
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 165 additions and 17 deletions

View File

@ -13,12 +13,20 @@ import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.TimedFlow
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.castIfPossible
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.*
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
@ -30,7 +38,11 @@ import net.corda.node.services.config.shouldCheckCheckpoints
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
import net.corda.node.services.statemachine.interceptors.*
import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor
import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker
import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -42,7 +54,11 @@ import rx.Observable
import rx.subjects.PublishSubject
import java.security.SecureRandom
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.ArrayList
@ -220,6 +236,7 @@ class SingleThreadedStateMachineManager(
database.transaction {
checkpointStorage.removeCheckpoint(id)
}
transitionExecutor.forceRemoveFlow(id)
}
} else {
// TODO replace with a clustered delete after we'll support clustered nodes

View File

@ -110,8 +110,8 @@ class StaffedFlowHospital {
/**
* The flow has been removed from the state machine.
*/
fun flowRemoved(flowFiber: FlowFiber) {
mutex.locked { patients.remove(flowFiber.id) }
fun flowRemoved(flowId: StateMachineRunId) {
mutex.locked { patients.remove(flowId) }
}
// TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC

View File

@ -1,6 +1,7 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
@ -17,6 +18,13 @@ interface TransitionExecutor {
transition: TransitionResult,
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState>
/**
* Called if the normal exit path where the new state is marked as removed via [StateMachineState.isRemoved] is not called.
* Currently this only happens via [StateMachineManager.killFlow]. This allows instances of this interface to clean up
* any state they are holding for a flow to prevent a memory leak.
*/
fun forceRemoveFlow(id: StateMachineRunId)
}
/**

View File

@ -1,6 +1,7 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
@ -20,6 +21,8 @@ class TransitionExecutorImpl(
val secureRandom: SecureRandom,
val database: CordaPersistence
) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {}
private companion object {
val log = contextLogger()
}

View File

@ -3,7 +3,12 @@ package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowFiber
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.TransitionExecutor
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.time.Instant
@ -48,4 +53,9 @@ class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : Transiti
return Pair(continuation, nextState)
}
override fun forceRemoveFlow(id: StateMachineRunId) {
records.remove(id)
delegate.forceRemoveFlow(id)
}
}

View File

@ -1,11 +1,18 @@
package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowFiber
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.TransitionExecutor
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.util.concurrent.LinkedBlockingQueue
@ -18,6 +25,10 @@ class FiberDeserializationCheckingInterceptor(
val fiberDeserializationChecker: FiberDeserializationChecker,
val delegate: TransitionExecutor
) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {
delegate.forceRemoveFlow(id)
}
@Suspendable
override fun executeTransition(
fiber: FlowFiber,

View File

@ -2,7 +2,13 @@ package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowFiber
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.TransitionExecutor
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.util.concurrent.ConcurrentHashMap
@ -15,6 +21,16 @@ class HospitalisingInterceptor(
private val flowHospital: StaffedFlowHospital,
private val delegate: TransitionExecutor
) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {
removeFlow(id)
delegate.forceRemoveFlow(id)
}
private fun removeFlow(id: StateMachineRunId) {
hospitalisedFlows.remove(id)
flowHospital.flowRemoved(id)
}
private val hospitalisedFlows = ConcurrentHashMap<StateMachineRunId, FlowFiber>()
@Suspendable
@ -41,8 +57,7 @@ class HospitalisingInterceptor(
}
}
if (nextState.isRemoved) {
hospitalisedFlows.remove(fiber.id)
flowHospital.flowRemoved(fiber)
removeFlow(fiber.id)
}
return Pair(continuation, nextState)
}

View File

@ -2,11 +2,21 @@ package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import com.codahale.metrics.MetricRegistry
import net.corda.node.services.statemachine.*
import net.corda.core.flows.StateMachineRunId
import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowFiber
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.TransitionExecutor
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
class MetricInterceptor(val metrics: MetricRegistry, val delegate: TransitionExecutor) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {
delegate.forceRemoveFlow(id)
}
@Suspendable
override fun executeTransition(fiber: FlowFiber, previousState: StateMachineState, event: Event, transition: TransitionResult, actionExecutor: ActionExecutor): Pair<FlowContinuation, StateMachineState> {
val metricActionInterceptor = MetricActionInterceptor(metrics, actionExecutor)

View File

@ -1,8 +1,13 @@
package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowFiber
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.TransitionExecutor
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.time.Instant
@ -11,6 +16,10 @@ import java.time.Instant
* This interceptor simply prints all state machine transitions. Useful for debugging.
*/
class PrintingInterceptor(val delegate: TransitionExecutor) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {
delegate.forceRemoveFlow(id)
}
companion object {
val log = contextLogger()
}

View File

@ -2,17 +2,17 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.packageName
import net.corda.core.messaging.MessageRecipients
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.internal.StartedNode
import net.corda.node.services.FinalityHandler
import net.corda.node.services.messaging.Message
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.nodeapi.internal.persistence.contextTransaction
@ -102,6 +102,51 @@ class RetryFlowMockTest {
assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty()
assertEquals(2, RetryFlow.count)
}
@Test
fun `Patient records do not leak in hospital when using killFlow`() {
val flow: FlowStateMachine<Unit> = nodeA.services.startFlow(FinalityHandler(object : FlowSession() {
override val counterparty: Party
get() = TODO("not implemented")
override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo {
TODO("not implemented")
}
override fun getCounterpartyFlowInfo(): FlowInfo {
TODO("not implemented")
}
override fun <R : Any> sendAndReceive(receiveType: Class<R>, payload: Any, maySkipCheckpoint: Boolean): UntrustworthyData<R> {
TODO("not implemented")
}
override fun <R : Any> sendAndReceive(receiveType: Class<R>, payload: Any): UntrustworthyData<R> {
TODO("not implemented")
}
override fun <R : Any> receive(receiveType: Class<R>, maySkipCheckpoint: Boolean): UntrustworthyData<R> {
TODO("not implemented")
}
override fun <R : Any> receive(receiveType: Class<R>): UntrustworthyData<R> {
TODO("not implemented")
}
override fun send(payload: Any, maySkipCheckpoint: Boolean) {
TODO("not implemented")
}
override fun send(payload: Any) {
TODO("not implemented")
}
}), nodeA.services.newContext()).get()
// Make sure we have seen an update from the hospital, and thus the flow went there.
nodeA.smm.flowHospital.track().updates.toBlocking().first()
// Killing it should remove it.
nodeA.smm.killFlow(flow.id)
assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty()
}
}
class LimitedRetryCausingError : ConstraintViolationException("Test message", SQLException(), "Test constraint")
@ -126,6 +171,26 @@ class RetryFlow(private val i: Int) : FlowLogic<Unit>() {
}
}
class RetryAndSleepFlow(private val i: Int) : FlowLogic<Unit>() {
companion object {
var count = 0
}
@Suspendable
override fun call() {
logger.info("Hello $count")
if (count++ < i) {
if (i == Int.MAX_VALUE) {
throw LimitedRetryCausingError()
} else {
throw RetryCausingError()
}
} else {
sleep(Duration.ofDays(1))
}
}
}
@InitiatingFlow
class SendAndRetryFlow(private val i: Int, private val other: Party) : FlowLogic<Unit>() {
companion object {