Merged in parkri-fix-pending-messages-table-name (pull request #426)

Pending redelivery table name prefix added in line with other tables.
This commit is contained in:
Rick Parker 2016-11-04 10:34:17 +00:00
commit 4b1d971137

View File

@ -83,7 +83,24 @@ class NodeMessagingClient(override val config: NodeConfiguration,
var rpcConsumer: ClientConsumer? = null
var rpcNotificationConsumer: ClientConsumer? = null
var pendingRedelivery = JDBCHashSet<Message>("pending_messages",loadOnInit = true)
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}pending_messages") {
val uuid = uuidString("message_id")
val message = blob("message")
}
val pendingRedelivery = object : AbstractJDBCHashMap<UUID, Message, Table>(Table, loadOnInit = false) {
override fun keyFromRow(row: ResultRow): UUID = row[table.uuid]
override fun valueFromRow(row: ResultRow): Message = deserializeFromBlob(row[table.message])
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<UUID, Message>, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry.key
}
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<UUID, Message>, finalizables: MutableList<() -> Unit>) {
insert[table.message] = serializeToBlob(entry.value, finalizables)
}
}
}
/** A registration to handle messages of different types */
@ -235,20 +252,20 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
private fun deliver(msg: Message): Boolean {
private fun deliver(msg: Message, redelivery: Boolean = false): Boolean {
state.checkNotLocked()
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
// or removed whilst the filter is executing will not affect anything.
val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession }
if (deliverTo.isEmpty()) {
if (deliverTo.isEmpty() && !redelivery) {
// This should probably be downgraded to a trace in future, so the protocol can evolve with new topics
// without causing log spam.
log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet")
state.locked {
databaseTransaction(database) {
pendingRedelivery.add(msg)
pendingRedelivery[msg.uniqueMessageId] = msg
}
}
return false
@ -265,7 +282,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// start/run/stop have re-entrancy assertions at the top, so it is OK.
executor.fetchFrom {
databaseTransaction(database) {
callHandlers(msg, deliverTo)
callHandlers(msg, deliverTo, redelivery)
}
}
} catch(e: Exception) {
@ -274,7 +291,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
return true
}
private fun callHandlers(msg: Message, deliverTo: List<Handler>) {
private fun callHandlers(msg: Message, deliverTo: List<Handler>, redelivery: Boolean) {
if (msg.uniqueMessageId in processedMessages) {
log.trace { "discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" }
return
@ -284,6 +301,9 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
// TODO We will at some point need to decide a trimming policy for the id's
processedMessages += msg.uniqueMessageId
if (redelivery) state.locked {
pendingRedelivery.remove(msg.uniqueMessageId)
}
}
override fun stop() {
@ -365,12 +385,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
val messagesToRedeliver = state.locked {
val pending = ArrayList<Message>()
databaseTransaction(database) {
pending.addAll(pendingRedelivery)
pendingRedelivery.clear()
pending.addAll(pendingRedelivery.values)
}
pending
}
messagesToRedeliver.forEach { deliver(it) }
messagesToRedeliver.forEach { deliver(it, true) }
return handler
}