node: Fix redelivery race

This commit is contained in:
Andras Slemmer
2016-08-24 15:27:35 +01:00
parent 5dfd3c7704
commit 9ff1ad7769

View File

@ -70,6 +70,9 @@ class ArtemisMessagingClient(directory: Path,
var consumer: ClientConsumer? = null var consumer: ClientConsumer? = null
var session: ClientSession? = null var session: ClientSession? = null
var clientFactory: ClientSessionFactory? = null var clientFactory: ClientSessionFactory? = null
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
var undeliveredMessages = listOf<Message>()
} }
/** A registration to handle messages of different types */ /** A registration to handle messages of different types */
@ -85,9 +88,6 @@ class ArtemisMessagingClient(directory: Path,
private val state = ThreadBox(InnerState()) private val state = ThreadBox(InnerState())
private val handlers = CopyOnWriteArrayList<Handler>() private val handlers = CopyOnWriteArrayList<Handler>()
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
private var undeliveredMessages = listOf<Message>()
init { init {
require(directory.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" } require(directory.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
} }
@ -214,7 +214,9 @@ class ArtemisMessagingClient(directory: Path,
// This is a hack; transient messages held in memory isn't crash resistant. // This is a hack; transient messages held in memory isn't crash resistant.
// TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use. // TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use.
state.locked {
undeliveredMessages += msg undeliveredMessages += msg
}
return false return false
} }
@ -312,8 +314,11 @@ class ArtemisMessagingClient(directory: Path,
require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." } require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." }
val handler = Handler(executor, topicSession, callback) val handler = Handler(executor, topicSession, callback)
handlers.add(handler) handlers.add(handler)
val messagesToRedeliver = state.locked {
val messagesToRedeliver = undeliveredMessages val messagesToRedeliver = undeliveredMessages
undeliveredMessages = listOf() undeliveredMessages = listOf()
messagesToRedeliver
}
messagesToRedeliver.forEach { deliver(it) } messagesToRedeliver.forEach { deliver(it) }
return handler return handler
} }