Pending redelivery table name prefix added in line with other tables.

No longer delete all pending messages from the database and move into memory before putting back into the database again, which is vulnerable to failure.  Now we delete individual pending messages on successful redelivery.
This commit is contained in:
rick.parker 2016-11-01 10:09:29 +00:00
parent 226b624004
commit 143e3a063f

View File

@ -83,7 +83,24 @@ class NodeMessagingClient(override val config: NodeConfiguration,
var rpcConsumer: ClientConsumer? = null var rpcConsumer: ClientConsumer? = null
var rpcNotificationConsumer: ClientConsumer? = null var rpcNotificationConsumer: ClientConsumer? = null
var pendingRedelivery = JDBCHashSet<Message>("pending_messages",loadOnInit = true) private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}pending_messages") {
val uuid = uuidString("message_id")
val message = blob("message")
}
val pendingRedelivery = object : AbstractJDBCHashMap<UUID, Message, Table>(Table, loadOnInit = false) {
override fun keyFromRow(row: ResultRow): UUID = row[table.uuid]
override fun valueFromRow(row: ResultRow): Message = deserializeFromBlob(row[table.message])
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<UUID, Message>, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry.key
}
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<UUID, Message>, finalizables: MutableList<() -> Unit>) {
insert[table.message] = serializeToBlob(entry.value, finalizables)
}
}
} }
/** A registration to handle messages of different types */ /** A registration to handle messages of different types */
@ -235,20 +252,20 @@ class NodeMessagingClient(override val config: NodeConfiguration,
} }
} }
private fun deliver(msg: Message): Boolean { private fun deliver(msg: Message, redelivery: Boolean = false): Boolean {
state.checkNotLocked() state.checkNotLocked()
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
// or removed whilst the filter is executing will not affect anything. // or removed whilst the filter is executing will not affect anything.
val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession } val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession }
if (deliverTo.isEmpty()) { if (deliverTo.isEmpty() && !redelivery) {
// This should probably be downgraded to a trace in future, so the protocol can evolve with new topics // This should probably be downgraded to a trace in future, so the protocol can evolve with new topics
// without causing log spam. // without causing log spam.
log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet") log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet")
state.locked { state.locked {
databaseTransaction(database) { databaseTransaction(database) {
pendingRedelivery.add(msg) pendingRedelivery[msg.uniqueMessageId] = msg
} }
} }
return false return false
@ -265,7 +282,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// start/run/stop have re-entrancy assertions at the top, so it is OK. // start/run/stop have re-entrancy assertions at the top, so it is OK.
executor.fetchFrom { executor.fetchFrom {
databaseTransaction(database) { databaseTransaction(database) {
callHandlers(msg, deliverTo) callHandlers(msg, deliverTo, redelivery)
} }
} }
} catch(e: Exception) { } catch(e: Exception) {
@ -274,7 +291,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
return true return true
} }
private fun callHandlers(msg: Message, deliverTo: List<Handler>) { private fun callHandlers(msg: Message, deliverTo: List<Handler>, redelivery: Boolean) {
if (msg.uniqueMessageId in processedMessages) { if (msg.uniqueMessageId in processedMessages) {
log.trace { "discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" } log.trace { "discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" }
return return
@ -284,6 +301,9 @@ class NodeMessagingClient(override val config: NodeConfiguration,
} }
// TODO We will at some point need to decide a trimming policy for the id's // TODO We will at some point need to decide a trimming policy for the id's
processedMessages += msg.uniqueMessageId processedMessages += msg.uniqueMessageId
if (redelivery) state.locked {
pendingRedelivery.remove(msg.uniqueMessageId)
}
} }
override fun stop() { override fun stop() {
@ -365,12 +385,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
val messagesToRedeliver = state.locked { val messagesToRedeliver = state.locked {
val pending = ArrayList<Message>() val pending = ArrayList<Message>()
databaseTransaction(database) { databaseTransaction(database) {
pending.addAll(pendingRedelivery) pending.addAll(pendingRedelivery.values)
pendingRedelivery.clear()
} }
pending pending
} }
messagesToRedeliver.forEach { deliver(it) } messagesToRedeliver.forEach { deliver(it, true) }
return handler return handler
} }