mirror of
https://github.com/corda/corda.git
synced 2025-06-17 14:48:16 +00:00
CORDA-2129: Attempts to initiate a flow that's unknown is paused and instead sent to the flow hospital (#4181)
This allows for the case of a missing CorDapp to be installed on node restart and recover the original flow initiate request.
This commit is contained in:
@ -1,8 +1,16 @@
|
|||||||
package net.corda.node.services.statemachine
|
package net.corda.node.services.statemachine
|
||||||
|
|
||||||
import net.corda.core.CordaException
|
import net.corda.core.CordaException
|
||||||
|
import net.corda.core.flows.FlowLogic
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An exception propagated and thrown in case a session initiation fails.
|
* An exception propagated and thrown in case a session initiation fails.
|
||||||
*/
|
*/
|
||||||
class SessionRejectException(message: String) : CordaException(message)
|
open class SessionRejectException(message: String) : CordaException(message) {
|
||||||
|
class UnknownClass(val initiatorFlowClassName: String) : SessionRejectException("Don't know $initiatorFlowClassName")
|
||||||
|
|
||||||
|
class NotAFlow(val initiatorClass: Class<*>) : SessionRejectException("${initiatorClass.name} is not a flow")
|
||||||
|
|
||||||
|
class NotRegistered(val initiatorFlowClass: Class<out FlowLogic<*>>) : SessionRejectException("${initiatorFlowClass.name} is not registered")
|
||||||
|
}
|
||||||
|
|
||||||
|
@ -18,7 +18,8 @@ import net.corda.core.internal.concurrent.OpenFuture
|
|||||||
import net.corda.core.internal.concurrent.map
|
import net.corda.core.internal.concurrent.map
|
||||||
import net.corda.core.internal.concurrent.openFuture
|
import net.corda.core.internal.concurrent.openFuture
|
||||||
import net.corda.core.messaging.DataFeed
|
import net.corda.core.messaging.DataFeed
|
||||||
import net.corda.core.serialization.*
|
import net.corda.core.serialization.SerializedBytes
|
||||||
|
import net.corda.core.serialization.deserialize
|
||||||
import net.corda.core.serialization.internal.CheckpointSerializationContext
|
import net.corda.core.serialization.internal.CheckpointSerializationContext
|
||||||
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
|
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
|
||||||
import net.corda.core.serialization.internal.checkpointDeserialize
|
import net.corda.core.serialization.internal.checkpointDeserialize
|
||||||
@ -32,7 +33,6 @@ import net.corda.node.services.api.CheckpointStorage
|
|||||||
import net.corda.node.services.api.ServiceHubInternal
|
import net.corda.node.services.api.ServiceHubInternal
|
||||||
import net.corda.node.services.config.shouldCheckCheckpoints
|
import net.corda.node.services.config.shouldCheckCheckpoints
|
||||||
import net.corda.node.services.messaging.DeduplicationHandler
|
import net.corda.node.services.messaging.DeduplicationHandler
|
||||||
import net.corda.node.services.messaging.ReceivedMessage
|
|
||||||
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
|
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
|
||||||
import net.corda.node.services.statemachine.interceptors.*
|
import net.corda.node.services.statemachine.interceptors.*
|
||||||
import net.corda.node.services.statemachine.transitions.StateMachine
|
import net.corda.node.services.statemachine.transitions.StateMachine
|
||||||
@ -92,8 +92,6 @@ class SingleThreadedStateMachineManager(
|
|||||||
val timedFlows = HashMap<StateMachineRunId, ScheduledTimeout>()
|
val timedFlows = HashMap<StateMachineRunId, ScheduledTimeout>()
|
||||||
}
|
}
|
||||||
|
|
||||||
override val flowHospital: StaffedFlowHospital = StaffedFlowHospital()
|
|
||||||
|
|
||||||
private val mutex = ThreadBox(InnerState())
|
private val mutex = ThreadBox(InnerState())
|
||||||
private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor)
|
private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor)
|
||||||
private val timeoutScheduler = Executors.newScheduledThreadPool(1)
|
private val timeoutScheduler = Executors.newScheduledThreadPool(1)
|
||||||
@ -104,12 +102,14 @@ class SingleThreadedStateMachineManager(
|
|||||||
private val sessionToFlow = ConcurrentHashMap<SessionId, StateMachineRunId>()
|
private val sessionToFlow = ConcurrentHashMap<SessionId, StateMachineRunId>()
|
||||||
private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub)
|
private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub)
|
||||||
private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null
|
private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null
|
||||||
private val transitionExecutor = makeTransitionExecutor()
|
|
||||||
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
|
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
|
||||||
|
|
||||||
private var checkpointSerializationContext: CheckpointSerializationContext? = null
|
private var checkpointSerializationContext: CheckpointSerializationContext? = null
|
||||||
private var actionExecutor: ActionExecutor? = null
|
private var actionExecutor: ActionExecutor? = null
|
||||||
|
|
||||||
|
override val flowHospital: StaffedFlowHospital = StaffedFlowHospital(flowMessaging, ourSenderUUID)
|
||||||
|
private val transitionExecutor = makeTransitionExecutor()
|
||||||
|
|
||||||
override val allStateMachines: List<FlowLogic<*>>
|
override val allStateMachines: List<FlowLogic<*>>
|
||||||
get() = mutex.locked { flows.values.map { it.fiber.logic } }
|
get() = mutex.locked { flows.values.map { it.fiber.logic } }
|
||||||
|
|
||||||
@ -204,7 +204,7 @@ class SingleThreadedStateMachineManager(
|
|||||||
invocationContext = context,
|
invocationContext = context,
|
||||||
flowLogic = flowLogic,
|
flowLogic = flowLogic,
|
||||||
flowStart = FlowStart.Explicit,
|
flowStart = FlowStart.Explicit,
|
||||||
ourIdentity = ourIdentity ?: getOurFirstIdentity(),
|
ourIdentity = ourIdentity ?: ourFirstIdentity,
|
||||||
deduplicationHandler = deduplicationHandler,
|
deduplicationHandler = deduplicationHandler,
|
||||||
isStartIdempotent = false
|
isStartIdempotent = false
|
||||||
)
|
)
|
||||||
@ -402,23 +402,23 @@ class SingleThreadedStateMachineManager(
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun onSessionMessage(event: ExternalEvent.ExternalMessageEvent) {
|
private fun onSessionMessage(event: ExternalEvent.ExternalMessageEvent) {
|
||||||
val message: ReceivedMessage = event.receivedMessage
|
val peer = event.receivedMessage.peer
|
||||||
val deduplicationHandler: DeduplicationHandler = event.deduplicationHandler
|
|
||||||
val peer = message.peer
|
|
||||||
val sessionMessage = try {
|
val sessionMessage = try {
|
||||||
message.data.deserialize<SessionMessage>()
|
event.receivedMessage.data.deserialize<SessionMessage>()
|
||||||
} catch (ex: Exception) {
|
} catch (ex: Exception) {
|
||||||
logger.error("Received corrupt SessionMessage data from $peer")
|
logger.error("Received corrupt SessionMessage data from $peer")
|
||||||
deduplicationHandler.afterDatabaseTransaction()
|
event.deduplicationHandler.afterDatabaseTransaction()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
val sender = serviceHub.networkMapCache.getPeerByLegalName(peer)
|
val sender = serviceHub.networkMapCache.getPeerByLegalName(peer)
|
||||||
if (sender != null) {
|
if (sender != null) {
|
||||||
when (sessionMessage) {
|
when (sessionMessage) {
|
||||||
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, deduplicationHandler, sender)
|
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, event.deduplicationHandler, sender)
|
||||||
is InitialSessionMessage -> onSessionInit(sessionMessage, message.platformVersion, deduplicationHandler, sender)
|
is InitialSessionMessage -> onSessionInit(sessionMessage, sender, event)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// TODO Send the event to the flow hospital to be retried on network map update
|
||||||
|
// TODO Test that restarting the node attempts to retry
|
||||||
logger.error("Unknown peer $peer in $sessionMessage")
|
logger.error("Unknown peer $peer in $sessionMessage")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -448,14 +448,8 @@ class SingleThreadedStateMachineManager(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun onSessionInit(sessionMessage: InitialSessionMessage, senderPlatformVersion: Int, deduplicationHandler: DeduplicationHandler, sender: Party) {
|
private fun onSessionInit(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent) {
|
||||||
fun createErrorMessage(initiatorSessionId: SessionId, message: String): ExistingSessionMessage {
|
try {
|
||||||
val errorId = secureRandom.nextLong()
|
|
||||||
val payload = RejectSessionMessage(message, errorId)
|
|
||||||
return ExistingSessionMessage(initiatorSessionId, payload)
|
|
||||||
}
|
|
||||||
|
|
||||||
val replyError = try {
|
|
||||||
val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage)
|
val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage)
|
||||||
val initiatedSessionId = SessionId.createRandom(secureRandom)
|
val initiatedSessionId = SessionId.createRandom(secureRandom)
|
||||||
val senderSession = FlowSessionImpl(sender, initiatedSessionId)
|
val senderSession = FlowSessionImpl(sender, initiatedSessionId)
|
||||||
@ -465,40 +459,34 @@ class SingleThreadedStateMachineManager(
|
|||||||
is InitiatedFlowFactory.CorDapp -> FlowInfo(initiatedFlowFactory.flowVersion, initiatedFlowFactory.appName)
|
is InitiatedFlowFactory.CorDapp -> FlowInfo(initiatedFlowFactory.flowVersion, initiatedFlowFactory.appName)
|
||||||
}
|
}
|
||||||
val senderCoreFlowVersion = when (initiatedFlowFactory) {
|
val senderCoreFlowVersion = when (initiatedFlowFactory) {
|
||||||
is InitiatedFlowFactory.Core -> senderPlatformVersion
|
is InitiatedFlowFactory.Core -> event.receivedMessage.platformVersion
|
||||||
is InitiatedFlowFactory.CorDapp -> null
|
is InitiatedFlowFactory.CorDapp -> null
|
||||||
}
|
}
|
||||||
startInitiatedFlow(flowLogic, deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo)
|
startInitiatedFlow(flowLogic, event.deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo)
|
||||||
null
|
} catch (t: Throwable) {
|
||||||
} catch (exception: Exception) {
|
logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " +
|
||||||
logger.warn("Exception while creating initiated flow", exception)
|
"flowVersion=${sessionMessage.flowVersion}), sending to the flow hospital", t)
|
||||||
createErrorMessage(
|
flowHospital.sessionInitErrored(sessionMessage, sender, event, t)
|
||||||
sessionMessage.initiatorSessionId,
|
|
||||||
(exception as? SessionRejectException)?.message ?: "Unable to establish session"
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (replyError != null) {
|
|
||||||
flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID))
|
|
||||||
deduplicationHandler.afterDatabaseTransaction()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO this is a temporary hack until we figure out multiple identities
|
// TODO this is a temporary hack until we figure out multiple identities
|
||||||
private fun getOurFirstIdentity(): Party {
|
private val ourFirstIdentity: Party get() = serviceHub.myInfo.legalIdentities[0]
|
||||||
return serviceHub.myInfo.legalIdentities[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun getInitiatedFlowFactory(message: InitialSessionMessage): InitiatedFlowFactory<*> {
|
private fun getInitiatedFlowFactory(message: InitialSessionMessage): InitiatedFlowFactory<*> {
|
||||||
val initiatingFlowClass = try {
|
val initiatorClass = try {
|
||||||
Class.forName(message.initiatorFlowClassName, true, classloader).asSubclass(FlowLogic::class.java)
|
Class.forName(message.initiatorFlowClassName, true, classloader)
|
||||||
} catch (e: ClassNotFoundException) {
|
} catch (e: ClassNotFoundException) {
|
||||||
throw SessionRejectException("Don't know ${message.initiatorFlowClassName}")
|
throw SessionRejectException.UnknownClass(message.initiatorFlowClassName)
|
||||||
} catch (e: ClassCastException) {
|
|
||||||
throw SessionRejectException("${message.initiatorFlowClassName} is not a flow")
|
|
||||||
}
|
}
|
||||||
return serviceHub.getFlowFactory(initiatingFlowClass)
|
|
||||||
?: throw SessionRejectException("$initiatingFlowClass is not registered")
|
val initiatorFlowClass = try {
|
||||||
|
initiatorClass.asSubclass(FlowLogic::class.java)
|
||||||
|
} catch (e: ClassCastException) {
|
||||||
|
throw SessionRejectException.NotAFlow(initiatorClass)
|
||||||
|
}
|
||||||
|
|
||||||
|
return serviceHub.getFlowFactory(initiatorFlowClass) ?: throw SessionRejectException.NotRegistered(initiatorFlowClass)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun <A> startInitiatedFlow(
|
private fun <A> startInitiatedFlow(
|
||||||
@ -511,7 +499,7 @@ class SingleThreadedStateMachineManager(
|
|||||||
initiatedFlowInfo: FlowInfo
|
initiatedFlowInfo: FlowInfo
|
||||||
) {
|
) {
|
||||||
val flowStart = FlowStart.Initiated(peerSession, initiatedSessionId, initiatingMessage, senderCoreFlowVersion, initiatedFlowInfo)
|
val flowStart = FlowStart.Initiated(peerSession, initiatedSessionId, initiatingMessage, senderCoreFlowVersion, initiatedFlowInfo)
|
||||||
val ourIdentity = getOurFirstIdentity()
|
val ourIdentity = ourFirstIdentity
|
||||||
startFlowInternal(
|
startFlowInternal(
|
||||||
InvocationContext.peer(peerSession.counterparty.name), flowLogic, flowStart, ourIdentity,
|
InvocationContext.peer(peerSession.counterparty.name), flowLogic, flowStart, ourIdentity,
|
||||||
initiatingMessageDeduplicationHandler,
|
initiatingMessageDeduplicationHandler,
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package net.corda.node.services.statemachine
|
package net.corda.node.services.statemachine
|
||||||
|
|
||||||
|
import net.corda.core.crypto.newSecureRandom
|
||||||
import net.corda.core.flows.StateMachineRunId
|
import net.corda.core.flows.StateMachineRunId
|
||||||
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.internal.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.internal.TimedFlow
|
import net.corda.core.internal.TimedFlow
|
||||||
import net.corda.core.internal.bufferUntilSubscribed
|
import net.corda.core.internal.bufferUntilSubscribed
|
||||||
@ -16,65 +18,101 @@ import java.util.*
|
|||||||
/**
|
/**
|
||||||
* This hospital consults "staff" to see if they can automatically diagnose and treat flows.
|
* This hospital consults "staff" to see if they can automatically diagnose and treat flows.
|
||||||
*/
|
*/
|
||||||
class StaffedFlowHospital {
|
class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val ourSenderUUID: String) {
|
||||||
private companion object {
|
private companion object {
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout, FinalityDoctor)
|
private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout, FinalityDoctor)
|
||||||
}
|
}
|
||||||
|
|
||||||
private val mutex = ThreadBox(object {
|
private val mutex = ThreadBox(object {
|
||||||
val patients = HashMap<StateMachineRunId, MedicalHistory>()
|
val flowPatients = HashMap<StateMachineRunId, FlowMedicalHistory>()
|
||||||
|
val treatableSessionInits = HashMap<UUID, InternalSessionInitRecord>()
|
||||||
val recordsPublisher = PublishSubject.create<MedicalRecord>()
|
val recordsPublisher = PublishSubject.create<MedicalRecord>()
|
||||||
})
|
})
|
||||||
|
private val secureRandom = newSecureRandom()
|
||||||
|
|
||||||
class MedicalHistory {
|
/**
|
||||||
internal val records: MutableList<MedicalRecord> = mutableListOf()
|
* The node was unable to initiate the [InitialSessionMessage] from [sender].
|
||||||
|
*/
|
||||||
fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff): Boolean {
|
fun sessionInitErrored(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent, error: Throwable) {
|
||||||
val lastAdmittanceSuspendCount = (records.last() as MedicalRecord.Admitted).suspendCount
|
val time = Instant.now()
|
||||||
return records
|
val id = UUID.randomUUID()
|
||||||
.filterIsInstance<MedicalRecord.Discharged>()
|
val outcome = if (error is SessionRejectException.UnknownClass) {
|
||||||
.count { by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max
|
// We probably don't have the CorDapp installed so let's pause the message in the hopes that the CorDapp is
|
||||||
|
// installed on restart, at which point the message will be able proceed as normal. If not then it will need
|
||||||
|
// to be dropped manually.
|
||||||
|
Outcome.OVERNIGHT_OBSERVATION
|
||||||
|
} else {
|
||||||
|
Outcome.UNTREATABLE
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun toString(): String = "${this.javaClass.simpleName}(records = $records)"
|
val record = sessionMessage.run { MedicalRecord.SessionInit(id, time, outcome, initiatorFlowClassName, flowVersion, appName, sender, error) }
|
||||||
|
mutex.locked {
|
||||||
|
if (outcome != Outcome.UNTREATABLE) {
|
||||||
|
treatableSessionInits[id] = InternalSessionInitRecord(sessionMessage, event, record)
|
||||||
|
}
|
||||||
|
recordsPublisher.onNext(record)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (outcome == Outcome.UNTREATABLE) {
|
||||||
|
sendBackError(error, sessionMessage, sender, event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun sendBackError(error: Throwable, sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent) {
|
||||||
|
val message = (error as? SessionRejectException)?.message ?: "Unable to establish session"
|
||||||
|
val payload = RejectSessionMessage(message, secureRandom.nextLong())
|
||||||
|
val replyError = ExistingSessionMessage(sessionMessage.initiatorSessionId, payload)
|
||||||
|
|
||||||
|
flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID))
|
||||||
|
event.deduplicationHandler.afterDatabaseTransaction()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Drop the errored session-init message with the given ID ([MedicalRecord.SessionInit.id]). This will cause the node
|
||||||
|
* to send back the relevant session error to the initiator party and acknowledge its receipt from the message broker
|
||||||
|
* so that it never gets redelivered.
|
||||||
|
*/
|
||||||
|
fun dropSessionInit(id: UUID) {
|
||||||
|
val (sessionMessage, event, publicRecord) = mutex.locked {
|
||||||
|
requireNotNull(treatableSessionInits.remove(id)) { "$id does not refer to any session init message" }
|
||||||
|
}
|
||||||
|
log.info("Errored session-init permanently dropped: $publicRecord")
|
||||||
|
sendBackError(publicRecord.error, sessionMessage, publicRecord.sender, event)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The flow running in [flowFiber] has errored.
|
* The flow running in [flowFiber] has errored.
|
||||||
*/
|
*/
|
||||||
fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
|
fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
|
||||||
|
val time = Instant.now()
|
||||||
log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState")
|
log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState")
|
||||||
val suspendCount = currentState.checkpoint.numberOfSuspends
|
|
||||||
|
|
||||||
val event = mutex.locked {
|
val event = mutex.locked {
|
||||||
val medicalHistory = patients.computeIfAbsent(flowFiber.id) { MedicalHistory() }
|
val medicalHistory = flowPatients.computeIfAbsent(flowFiber.id) { FlowMedicalHistory() }
|
||||||
|
|
||||||
val admitted = MedicalRecord.Admitted(flowFiber.id, Instant.now(), suspendCount)
|
|
||||||
medicalHistory.records += admitted
|
|
||||||
recordsPublisher.onNext(admitted)
|
|
||||||
|
|
||||||
val report = consultStaff(flowFiber, currentState, errors, medicalHistory)
|
val report = consultStaff(flowFiber, currentState, errors, medicalHistory)
|
||||||
|
|
||||||
val (newRecord, event) = when (report.diagnosis) {
|
val (outcome, event) = when (report.diagnosis) {
|
||||||
Diagnosis.DISCHARGE -> {
|
Diagnosis.DISCHARGE -> {
|
||||||
log.info("Flow ${flowFiber.id} error discharged from hospital by ${report.by}")
|
log.info("Flow ${flowFiber.id} error discharged from hospital by ${report.by}")
|
||||||
Pair(MedicalRecord.Discharged(flowFiber.id, Instant.now(), suspendCount, report.by, errors), Event.RetryFlowFromSafePoint)
|
Pair(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint)
|
||||||
}
|
}
|
||||||
Diagnosis.OVERNIGHT_OBSERVATION -> {
|
Diagnosis.OVERNIGHT_OBSERVATION -> {
|
||||||
log.info("Flow ${flowFiber.id} error kept for overnight observation by ${report.by}")
|
log.info("Flow ${flowFiber.id} error kept for overnight observation by ${report.by}")
|
||||||
// We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart
|
// We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart
|
||||||
Pair(MedicalRecord.KeptInForObservation(flowFiber.id, Instant.now(), suspendCount, report.by, errors), null)
|
Pair(Outcome.OVERNIGHT_OBSERVATION, null)
|
||||||
}
|
}
|
||||||
Diagnosis.NOT_MY_SPECIALTY -> {
|
Diagnosis.NOT_MY_SPECIALTY -> {
|
||||||
// None of the staff care for these errors so we let them propagate
|
// None of the staff care for these errors so we let them propagate
|
||||||
log.info("Flow ${flowFiber.id} error allowed to propagate")
|
log.info("Flow ${flowFiber.id} error allowed to propagate")
|
||||||
Pair(MedicalRecord.NothingWeCanDo(flowFiber.id, Instant.now(), suspendCount), Event.StartErrorPropagation)
|
Pair(Outcome.UNTREATABLE, Event.StartErrorPropagation)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
medicalHistory.records += newRecord
|
val record = MedicalRecord.Flow(time, flowFiber.id, currentState.checkpoint.numberOfSuspends, errors, report.by, outcome)
|
||||||
recordsPublisher.onNext(newRecord)
|
medicalHistory.records += record
|
||||||
|
recordsPublisher.onNext(record)
|
||||||
event
|
event
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,8 +124,9 @@ class StaffedFlowHospital {
|
|||||||
private fun consultStaff(flowFiber: FlowFiber,
|
private fun consultStaff(flowFiber: FlowFiber,
|
||||||
currentState: StateMachineState,
|
currentState: StateMachineState,
|
||||||
errors: List<Throwable>,
|
errors: List<Throwable>,
|
||||||
medicalHistory: MedicalHistory): ConsultationReport {
|
medicalHistory: FlowMedicalHistory): ConsultationReport {
|
||||||
return errors
|
return errors
|
||||||
|
.asSequence()
|
||||||
.mapIndexed { index, error ->
|
.mapIndexed { index, error ->
|
||||||
log.info("Flow ${flowFiber.id} has error [$index]", error)
|
log.info("Flow ${flowFiber.id} has error [$index]", error)
|
||||||
val diagnoses: Map<Diagnosis, List<Staff>> = staff.groupBy { it.consult(flowFiber, currentState, error, medicalHistory) }
|
val diagnoses: Map<Diagnosis, List<Staff>> = staff.groupBy { it.consult(flowFiber, currentState, error, medicalHistory) }
|
||||||
@ -105,43 +144,61 @@ class StaffedFlowHospital {
|
|||||||
* The flow has been removed from the state machine.
|
* The flow has been removed from the state machine.
|
||||||
*/
|
*/
|
||||||
fun flowRemoved(flowId: StateMachineRunId) {
|
fun flowRemoved(flowId: StateMachineRunId) {
|
||||||
mutex.locked { patients.remove(flowId) }
|
mutex.locked { flowPatients.remove(flowId) }
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC
|
// TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC
|
||||||
/** Returns a stream of medical records as flows pass through the hospital. */
|
/** Returns a stream of medical records as flows pass through the hospital. */
|
||||||
fun track(): DataFeed<List<MedicalRecord>, MedicalRecord> {
|
fun track(): DataFeed<List<MedicalRecord>, MedicalRecord> {
|
||||||
return mutex.locked {
|
return mutex.locked {
|
||||||
DataFeed(patients.values.flatMap { it.records }, recordsPublisher.bufferUntilSubscribed())
|
val snapshot = (flowPatients.values.flatMap { it.records } + treatableSessionInits.values.map { it.publicRecord }).sortedBy { it.time }
|
||||||
|
DataFeed(snapshot, recordsPublisher.bufferUntilSubscribed())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sealed class MedicalRecord {
|
class FlowMedicalHistory {
|
||||||
abstract val flowId: StateMachineRunId
|
internal val records: MutableList<MedicalRecord.Flow> = mutableListOf()
|
||||||
abstract val at: Instant
|
|
||||||
abstract val suspendCount: Int
|
|
||||||
|
|
||||||
data class Admitted(override val flowId: StateMachineRunId,
|
fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff, currentState: StateMachineState): Boolean {
|
||||||
override val at: Instant,
|
val lastAdmittanceSuspendCount = currentState.checkpoint.numberOfSuspends
|
||||||
override val suspendCount: Int) : MedicalRecord()
|
return records.count { it.outcome == Outcome.DISCHARGE && by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max
|
||||||
|
}
|
||||||
|
|
||||||
data class Discharged(override val flowId: StateMachineRunId,
|
override fun toString(): String = "${this.javaClass.simpleName}(records = $records)"
|
||||||
override val at: Instant,
|
|
||||||
override val suspendCount: Int,
|
|
||||||
val by: List<Staff>,
|
|
||||||
val errors: List<Throwable>) : MedicalRecord()
|
|
||||||
|
|
||||||
data class KeptInForObservation(override val flowId: StateMachineRunId,
|
|
||||||
override val at: Instant,
|
|
||||||
override val suspendCount: Int,
|
|
||||||
val by: List<Staff>,
|
|
||||||
val errors: List<Throwable>) : MedicalRecord()
|
|
||||||
|
|
||||||
data class NothingWeCanDo(override val flowId: StateMachineRunId,
|
|
||||||
override val at: Instant,
|
|
||||||
override val suspendCount: Int) : MedicalRecord()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private data class InternalSessionInitRecord(val sessionMessage: InitialSessionMessage,
|
||||||
|
val event: ExternalEvent.ExternalMessageEvent,
|
||||||
|
val publicRecord: MedicalRecord.SessionInit)
|
||||||
|
|
||||||
|
sealed class MedicalRecord {
|
||||||
|
abstract val time: Instant
|
||||||
|
abstract val outcome: Outcome
|
||||||
|
abstract val errors: List<Throwable>
|
||||||
|
|
||||||
|
/** Medical record for a flow that has errored. */
|
||||||
|
data class Flow(override val time: Instant,
|
||||||
|
val flowId: StateMachineRunId,
|
||||||
|
val suspendCount: Int,
|
||||||
|
override val errors: List<Throwable>,
|
||||||
|
val by: List<Staff>,
|
||||||
|
override val outcome: Outcome) : MedicalRecord()
|
||||||
|
|
||||||
|
/** Medical record for a session initiation that was unsuccessful. */
|
||||||
|
data class SessionInit(val id: UUID,
|
||||||
|
override val time: Instant,
|
||||||
|
override val outcome: Outcome,
|
||||||
|
val initiatorFlowClassName: String,
|
||||||
|
val flowVersion: Int,
|
||||||
|
val appName: String,
|
||||||
|
val sender: Party,
|
||||||
|
val error: Throwable) : MedicalRecord() {
|
||||||
|
override val errors: List<Throwable> get() = listOf(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum class Outcome { DISCHARGE, OVERNIGHT_OBSERVATION, UNTREATABLE }
|
||||||
|
|
||||||
/** The order of the enum values are in priority order. */
|
/** The order of the enum values are in priority order. */
|
||||||
enum class Diagnosis {
|
enum class Diagnosis {
|
||||||
/** Retry from last safe point. */
|
/** Retry from last safe point. */
|
||||||
@ -153,14 +210,14 @@ class StaffedFlowHospital {
|
|||||||
}
|
}
|
||||||
|
|
||||||
interface Staff {
|
interface Staff {
|
||||||
fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis
|
fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* SQL Deadlock detection.
|
* SQL Deadlock detection.
|
||||||
*/
|
*/
|
||||||
object DeadlockNurse : Staff {
|
object DeadlockNurse : Staff {
|
||||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
|
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
||||||
return if (mentionsDeadlock(newError)) {
|
return if (mentionsDeadlock(newError)) {
|
||||||
Diagnosis.DISCHARGE
|
Diagnosis.DISCHARGE
|
||||||
} else {
|
} else {
|
||||||
@ -178,8 +235,8 @@ class StaffedFlowHospital {
|
|||||||
* Primary key violation detection for duplicate inserts. Will detect other constraint violations too.
|
* Primary key violation detection for duplicate inserts. Will detect other constraint violations too.
|
||||||
*/
|
*/
|
||||||
object DuplicateInsertSpecialist : Staff {
|
object DuplicateInsertSpecialist : Staff {
|
||||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
|
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
||||||
return if (mentionsConstraintViolation(newError) && history.notDischargedForTheSameThingMoreThan(3, this)) {
|
return if (mentionsConstraintViolation(newError) && history.notDischargedForTheSameThingMoreThan(3, this, currentState)) {
|
||||||
Diagnosis.DISCHARGE
|
Diagnosis.DISCHARGE
|
||||||
} else {
|
} else {
|
||||||
Diagnosis.NOT_MY_SPECIALTY
|
Diagnosis.NOT_MY_SPECIALTY
|
||||||
@ -196,9 +253,9 @@ class StaffedFlowHospital {
|
|||||||
* exceed the limit specified by the [FlowTimeoutException].
|
* exceed the limit specified by the [FlowTimeoutException].
|
||||||
*/
|
*/
|
||||||
object DoctorTimeout : Staff {
|
object DoctorTimeout : Staff {
|
||||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
|
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
||||||
if (newError is FlowTimeoutException) {
|
if (newError is FlowTimeoutException) {
|
||||||
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) {
|
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this, currentState)) {
|
||||||
return Diagnosis.DISCHARGE
|
return Diagnosis.DISCHARGE
|
||||||
} else {
|
} else {
|
||||||
val errorMsg = "Maximum number of retries reached for flow ${flowFiber.snapshot().flowLogic.javaClass}. " +
|
val errorMsg = "Maximum number of retries reached for flow ${flowFiber.snapshot().flowLogic.javaClass}. " +
|
||||||
@ -216,12 +273,18 @@ class StaffedFlowHospital {
|
|||||||
* Parks [FinalityHandler]s for observation.
|
* Parks [FinalityHandler]s for observation.
|
||||||
*/
|
*/
|
||||||
object FinalityDoctor : Staff {
|
object FinalityDoctor : Staff {
|
||||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
|
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
||||||
return (currentState.flowLogic as? FinalityHandler)?.let { logic -> Diagnosis.OVERNIGHT_OBSERVATION.also { warn(logic, flowFiber, currentState) } } ?: Diagnosis.NOT_MY_SPECIALTY
|
return if (currentState.flowLogic is FinalityHandler) {
|
||||||
|
warn(currentState.flowLogic, flowFiber, currentState)
|
||||||
|
Diagnosis.OVERNIGHT_OBSERVATION
|
||||||
|
} else {
|
||||||
|
Diagnosis.NOT_MY_SPECIALTY
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun warn(flowLogic: FinalityHandler, flowFiber: FlowFiber, currentState: StateMachineState) {
|
private fun warn(flowLogic: FinalityHandler, flowFiber: FlowFiber, currentState: StateMachineState) {
|
||||||
log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying the flow by re-starting the node. State machine state: $currentState, initiating party was: ${flowLogic.sender().name}")
|
log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " +
|
||||||
|
"the flow by re-starting the node. State machine state: $currentState, initiating party was: ${flowLogic.sender().name}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,8 +10,7 @@ import net.corda.core.utilities.getOrThrow
|
|||||||
import net.corda.finance.POUNDS
|
import net.corda.finance.POUNDS
|
||||||
import net.corda.finance.contracts.asset.Cash
|
import net.corda.finance.contracts.asset.Cash
|
||||||
import net.corda.finance.issuedBy
|
import net.corda.finance.issuedBy
|
||||||
import net.corda.node.services.statemachine.StaffedFlowHospital
|
import net.corda.node.services.statemachine.StaffedFlowHospital.*
|
||||||
import net.corda.node.services.statemachine.StaffedFlowHospital.MedicalRecord.KeptInForObservation
|
|
||||||
import net.corda.testing.core.ALICE_NAME
|
import net.corda.testing.core.ALICE_NAME
|
||||||
import net.corda.testing.core.BOB_NAME
|
import net.corda.testing.core.BOB_NAME
|
||||||
import net.corda.testing.core.singleIdentity
|
import net.corda.testing.core.singleIdentity
|
||||||
@ -72,11 +71,11 @@ class FinalityHandlerTest {
|
|||||||
val keptInForObservation = smm.flowHospital
|
val keptInForObservation = smm.flowHospital
|
||||||
.track()
|
.track()
|
||||||
.let { it.updates.startWith(it.snapshot) }
|
.let { it.updates.startWith(it.snapshot) }
|
||||||
.filter { it.flowId == runId }
|
.ofType(MedicalRecord.Flow::class.java)
|
||||||
.ofType(KeptInForObservation::class.java)
|
.filter { it.flowId == runId && it.outcome == Outcome.OVERNIGHT_OBSERVATION }
|
||||||
.toBlocking()
|
.toBlocking()
|
||||||
.first()
|
.first()
|
||||||
assertThat(keptInForObservation.by).contains(StaffedFlowHospital.FinalityDoctor)
|
assertThat(keptInForObservation.by).contains(FinalityDoctor)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun TestStartedNode.getTransaction(id: SecureHash): SignedTransaction? {
|
private fun TestStartedNode.getTransaction(id: SecureHash): SignedTransaction? {
|
||||||
|
@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Fiber
|
|||||||
import co.paralleluniverse.fibers.Suspendable
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
import co.paralleluniverse.strands.Strand
|
import co.paralleluniverse.strands.Strand
|
||||||
import co.paralleluniverse.strands.concurrent.Semaphore
|
import co.paralleluniverse.strands.concurrent.Semaphore
|
||||||
|
import net.corda.client.rpc.notUsed
|
||||||
import net.corda.core.concurrent.CordaFuture
|
import net.corda.core.concurrent.CordaFuture
|
||||||
import net.corda.core.contracts.ContractState
|
import net.corda.core.contracts.ContractState
|
||||||
import net.corda.core.contracts.StateAndRef
|
import net.corda.core.contracts.StateAndRef
|
||||||
@ -68,7 +69,7 @@ class FlowFrameworkTests {
|
|||||||
@Before
|
@Before
|
||||||
fun setUpMockNet() {
|
fun setUpMockNet() {
|
||||||
mockNet = InternalMockNetwork(
|
mockNet = InternalMockNetwork(
|
||||||
cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"),
|
cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts"),
|
||||||
servicePeerAllocationStrategy = RoundRobin()
|
servicePeerAllocationStrategy = RoundRobin()
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -100,8 +101,8 @@ class FlowFrameworkTests {
|
|||||||
assertThat(flow.lazyTime).isNotNull()
|
assertThat(flow.lazyTime).isNotNull()
|
||||||
}
|
}
|
||||||
|
|
||||||
class SuspendThrowingActionExecutor(private val exception: Exception, val delegate: ActionExecutor) : ActionExecutor {
|
class SuspendThrowingActionExecutor(private val exception: Exception, private val delegate: ActionExecutor) : ActionExecutor {
|
||||||
var thrown = false
|
private var thrown = false
|
||||||
@Suspendable
|
@Suspendable
|
||||||
override fun executeAction(fiber: FlowFiber, action: Action) {
|
override fun executeAction(fiber: FlowFiber, action: Action) {
|
||||||
if (action is Action.CommitTransaction && !thrown) {
|
if (action is Action.CommitTransaction && !thrown) {
|
||||||
@ -367,10 +368,17 @@ class FlowFrameworkTests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `unknown class in session init`() {
|
fun `session init with unknown class is sent to the flow hospital, from where it's dropped`() {
|
||||||
aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null), bob)
|
aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null), bob)
|
||||||
mockNet.runNetwork()
|
mockNet.runNetwork()
|
||||||
assertThat(receivedSessionMessages).hasSize(2) // Only the session-init and session-reject are expected
|
assertThat(receivedSessionMessages).hasSize(1) // Only the session-init is expected as the session-reject is blocked by the flow hospital
|
||||||
|
val medicalRecords = bobNode.smm.flowHospital.track().apply { updates.notUsed() }.snapshot
|
||||||
|
assertThat(medicalRecords).hasSize(1)
|
||||||
|
val sessionInitRecord = medicalRecords[0] as StaffedFlowHospital.MedicalRecord.SessionInit
|
||||||
|
assertThat(sessionInitRecord.initiatorFlowClassName).isEqualTo("not.a.real.Class")
|
||||||
|
bobNode.smm.flowHospital.dropSessionInit(sessionInitRecord.id) // Drop the message which is processed as an error back to sender
|
||||||
|
mockNet.runNetwork()
|
||||||
|
assertThat(receivedSessionMessages).hasSize(2) // Now the session-reject is expected
|
||||||
val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage
|
val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage
|
||||||
assertThat((lastMessage.payload as RejectSessionMessage).message).isEqualTo("Don't know not.a.real.Class")
|
assertThat((lastMessage.payload as RejectSessionMessage).message).isEqualTo("Don't know not.a.real.Class")
|
||||||
}
|
}
|
||||||
@ -603,10 +611,6 @@ class FlowFrameworkTripartyTests {
|
|||||||
|
|
||||||
private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)
|
private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)
|
||||||
|
|
||||||
private fun assertSessionTransfers(vararg expected: SessionTransfer) {
|
|
||||||
assertThat(receivedSessionMessages).containsExactly(*expected)
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List<SessionTransfer> {
|
private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List<SessionTransfer> {
|
||||||
val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress }
|
val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress }
|
||||||
assertThat(actualForNode).containsExactly(*expected)
|
assertThat(actualForNode).containsExactly(*expected)
|
||||||
@ -749,16 +753,6 @@ class FlowFrameworkPersistenceTests {
|
|||||||
return newNode.getSingleFlow<P>().first
|
return newNode.getSingleFlow<P>().first
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun assertSessionTransfers(vararg expected: SessionTransfer) {
|
|
||||||
assertThat(receivedSessionMessages).containsExactly(*expected)
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List<SessionTransfer> {
|
|
||||||
val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress }
|
|
||||||
assertThat(actualForNode).containsExactly(*expected)
|
|
||||||
return actualForNode
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun receivedSessionMessagesObservable(): Observable<SessionTransfer> {
|
private fun receivedSessionMessagesObservable(): Observable<SessionTransfer> {
|
||||||
return mockNet.messagingNetwork.receivedMessages.toSessionTransfers()
|
return mockNet.messagingNetwork.receivedMessages.toSessionTransfers()
|
||||||
}
|
}
|
||||||
@ -832,19 +826,6 @@ private val FlowLogic<*>.progressSteps: CordaFuture<List<Notification<ProgressTr
|
|||||||
.toFuture()
|
.toFuture()
|
||||||
}
|
}
|
||||||
|
|
||||||
class ThrowingActionExecutor(private val exception: Exception, val delegate: ActionExecutor) : ActionExecutor {
|
|
||||||
var thrown = false
|
|
||||||
@Suspendable
|
|
||||||
override fun executeAction(fiber: FlowFiber, action: Action) {
|
|
||||||
if (thrown) {
|
|
||||||
delegate.executeAction(fiber, action)
|
|
||||||
} else {
|
|
||||||
thrown = true
|
|
||||||
throw exception
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@InitiatingFlow
|
@InitiatingFlow
|
||||||
private class WaitForOtherSideEndBeforeSendAndReceive(val otherParty: Party,
|
private class WaitForOtherSideEndBeforeSendAndReceive(val otherParty: Party,
|
||||||
@Transient val receivedOtherFlowEnd: Semaphore) : FlowLogic<Unit>() {
|
@Transient val receivedOtherFlowEnd: Semaphore) : FlowLogic<Unit>() {
|
||||||
|
@ -175,8 +175,6 @@ class RetryFlowMockTest {
|
|||||||
TODO("not implemented")
|
TODO("not implemented")
|
||||||
}
|
}
|
||||||
}), nodeA.services.newContext()).get()
|
}), nodeA.services.newContext()).get()
|
||||||
// Should be 2 records, one for admission and one for keep in.
|
|
||||||
records.next()
|
|
||||||
records.next()
|
records.next()
|
||||||
// Killing it should remove it.
|
// Killing it should remove it.
|
||||||
nodeA.smm.killFlow(flow.id)
|
nodeA.smm.killFlow(flow.id)
|
||||||
|
Reference in New Issue
Block a user