Ack unhandled messages when recipient flow is shut or shutting down (#335)

This commit is contained in:
Andras Slemmer
2018-01-10 17:55:08 +00:00
committed by Rick Parker
parent 4caf6d92ea
commit 2921b8044d
4 changed files with 88 additions and 0 deletions

View File

@ -324,6 +324,7 @@ class StateMachineManagerImpl(
val recipientId = sessionMessage.recipientSessionId
val flowId = sessionToFlow[recipientId]
if (flowId == null) {
acknowledgeHandle.acknowledge()
if (sessionMessage.payload is EndSessionMessage) {
logger.debug {
"Got ${EndSessionMessage::class.java.simpleName} for " +
@ -610,6 +611,7 @@ class StateMachineManagerImpl(
removalReason: FlowRemovalReason.OrderlyFinish,
lastState: StateMachineState
) {
drainFlowEventQueue(flow)
// final sanity checks
require(lastState.unacknowledgedMessages.isEmpty())
require(lastState.isRemoved)
@ -625,6 +627,7 @@ class StateMachineManagerImpl(
removalReason: FlowRemovalReason.ErrorFinish,
lastState: StateMachineState
) {
drainFlowEventQueue(flow)
val flowError = removalReason.flowErrors[0] // TODO what to do with several?
val exception = flowError.exception
(exception as? FlowException)?.originalErrorId = flowError.errorId
@ -632,6 +635,30 @@ class StateMachineManagerImpl(
lastState.flowLogic.progressTracker?.endWithError(exception)
changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Failure<Nothing>(exception)))
}
// The flow's event queue may be non-empty in case it shut down abruptly. We handle outstanding events here.
private fun drainFlowEventQueue(flow: Flow) {
while (true) {
val event = flow.fiber.transientValues!!.value.eventQueue.tryReceive() ?: return
when (event) {
is Event.DeliverSessionMessage -> {
// Acknowledge the message so it doesn't leak in the broker.
event.acknowledgeHandle.acknowledge()
when (event.sessionMessage.payload) {
EndSessionMessage -> {
logger.debug { "Unhandled message ${event.sessionMessage} due to flow shutting down" }
}
else -> {
logger.warn("Unhandled message ${event.sessionMessage} due to flow shutting down")
}
}
}
else -> {
logger.warn("Unhandled event $event due to flow shutting down")
}
}
}
}
}
class SessionRejectException(reason: String) : CordaException(reason)