diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt index f69c528f1c..dbb7a36236 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt @@ -83,7 +83,24 @@ class NodeMessagingClient(override val config: NodeConfiguration, var rpcConsumer: ClientConsumer? = null var rpcNotificationConsumer: ClientConsumer? = null - var pendingRedelivery = JDBCHashSet("pending_messages",loadOnInit = true) + private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}pending_messages") { + val uuid = uuidString("message_id") + val message = blob("message") + } + + val pendingRedelivery = object : AbstractJDBCHashMap(Table, loadOnInit = false) { + override fun keyFromRow(row: ResultRow): UUID = row[table.uuid] + + override fun valueFromRow(row: ResultRow): Message = deserializeFromBlob(row[table.message]) + + override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { + insert[table.uuid] = entry.key + } + + override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { + insert[table.message] = serializeToBlob(entry.value, finalizables) + } + } } /** A registration to handle messages of different types */ @@ -235,20 +252,20 @@ class NodeMessagingClient(override val config: NodeConfiguration, } } - private fun deliver(msg: Message): Boolean { + private fun deliver(msg: Message, redelivery: Boolean = false): Boolean { state.checkNotLocked() // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added // or removed whilst the filter is executing will not affect anything. val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession } - if (deliverTo.isEmpty()) { + if (deliverTo.isEmpty() && !redelivery) { // This should probably be downgraded to a trace in future, so the protocol can evolve with new topics // without causing log spam. log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet") state.locked { databaseTransaction(database) { - pendingRedelivery.add(msg) + pendingRedelivery[msg.uniqueMessageId] = msg } } return false @@ -265,7 +282,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, // start/run/stop have re-entrancy assertions at the top, so it is OK. executor.fetchFrom { databaseTransaction(database) { - callHandlers(msg, deliverTo) + callHandlers(msg, deliverTo, redelivery) } } } catch(e: Exception) { @@ -274,7 +291,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, return true } - private fun callHandlers(msg: Message, deliverTo: List) { + private fun callHandlers(msg: Message, deliverTo: List, redelivery: Boolean) { if (msg.uniqueMessageId in processedMessages) { log.trace { "discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" } return @@ -284,6 +301,9 @@ class NodeMessagingClient(override val config: NodeConfiguration, } // TODO We will at some point need to decide a trimming policy for the id's processedMessages += msg.uniqueMessageId + if (redelivery) state.locked { + pendingRedelivery.remove(msg.uniqueMessageId) + } } override fun stop() { @@ -365,12 +385,11 @@ class NodeMessagingClient(override val config: NodeConfiguration, val messagesToRedeliver = state.locked { val pending = ArrayList() databaseTransaction(database) { - pending.addAll(pendingRedelivery) - pendingRedelivery.clear() + pending.addAll(pendingRedelivery.values) } pending } - messagesToRedeliver.forEach { deliver(it) } + messagesToRedeliver.forEach { deliver(it, true) } return handler }