Merged in mnesbit-cor-389-deduplicate-artemis-messages (pull request )

Track message id's to deduplicate replays using JDBC Persistent Set
This commit is contained in:
Matthew Nesbit 2016-09-22 15:58:28 +01:00
commit 193c965db8
4 changed files with 63 additions and 37 deletions
node/src
main/kotlin/com/r3corda/node
test/kotlin/com/r3corda/node/services

@ -163,6 +163,10 @@ class ArtemisMessagingServer(config: NodeConfiguration,
config.acceptorConfigurations = setOf( config.acceptorConfigurations = setOf(
tcpTransport(ConnectionDirection.INBOUND, "0.0.0.0", hp.port) tcpTransport(ConnectionDirection.INBOUND, "0.0.0.0", hp.port)
) )
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
config.idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
config.isPersistIDCache = true
return config return config
} }
@ -198,6 +202,7 @@ class ArtemisMessagingServer(config: NodeConfiguration,
forwardingAddress = nameStr forwardingAddress = nameStr
staticConnectors = listOf(hostAndPort.toString()) staticConnectors = listOf(hostAndPort.toString())
confirmationWindowSize = 100000 // a guess confirmationWindowSize = 100000 // a guess
isUseDuplicateDetection = true // Enable the bridges automatic deduplication logic
}) })
} }

@ -6,9 +6,11 @@ import com.r3corda.core.messaging.*
import com.r3corda.core.serialization.SerializedBytes import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.opaque import com.r3corda.core.serialization.opaque
import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.utilities.AffinityExecutor import com.r3corda.node.utilities.AffinityExecutor
import com.r3corda.node.utilities.JDBCHashSet
import com.r3corda.node.utilities.databaseTransaction import com.r3corda.node.utilities.databaseTransaction
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
@ -16,6 +18,7 @@ import org.apache.activemq.artemis.api.core.client.*
import java.nio.file.FileSystems import java.nio.file.FileSystems
import java.security.PublicKey import java.security.PublicKey
import java.time.Instant import java.time.Instant
import java.util.*
import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executor import java.util.concurrent.Executor
@ -82,7 +85,7 @@ class NodeMessagingClient(config: NodeConfiguration,
var rpcNotificationConsumer: ClientConsumer? = null var rpcNotificationConsumer: ClientConsumer? = null
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server. // TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
var undeliveredMessages = listOf<Message>() var undeliveredMessages = listOf<Pair<Message, UUID>>()
} }
/** A registration to handle messages of different types */ /** A registration to handle messages of different types */
@ -97,6 +100,11 @@ class NodeMessagingClient(config: NodeConfiguration,
private val state = ThreadBox(InnerState()) private val state = ThreadBox(InnerState())
private val handlers = CopyOnWriteArrayList<Handler>() private val handlers = CopyOnWriteArrayList<Handler>()
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(if (persistentInbox) {
JDBCHashSet<UUID>("message_id", loadOnInit = true)
} else {
HashSet<UUID>()
})
init { init {
require(config.basedir.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" } require(config.basedir.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
@ -113,8 +121,9 @@ class NodeMessagingClient(config: NodeConfiguration,
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport) val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport)
clientFactory = locator.createSessionFactory() clientFactory = locator.createSessionFactory()
// Create a session and configure to commit manually after each acknowledge. (N.B. ackBatchSize is in Bytes!!!) // Create a session. Note that the acknowledgement of messages is not flushed to
val session = clientFactory!!.createSession(true, true, 1) // the DB until the default buffer size of 1MB is acknowledged.
val session = clientFactory!!.createSession(true, true, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
this.session = session this.session = session
session.start() session.start()
@ -170,9 +179,11 @@ class NodeMessagingClient(config: NodeConfiguration,
null null
} ?: break } ?: break
val message: Message? = artemisToCordaMessage(artemisMessage) // Use the magic deduplication property built into Artemis as our message identity too
val uuid = UUID.fromString(artemisMessage.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID))
val message: Message? = artemisToCordaMessage(artemisMessage, uuid)
if (message != null) if (message != null)
deliver(message) deliver(message, uuid)
// Ack the message so it won't be redelivered. We should only really do this when there were no // Ack the message so it won't be redelivered. We should only really do this when there were no
// transient failures. If we caught an exception in the handler, we could back off and retry delivery // transient failures. If we caught an exception in the handler, we could back off and retry delivery
@ -192,7 +203,7 @@ class NodeMessagingClient(config: NodeConfiguration,
shutdownLatch.countDown() shutdownLatch.countDown()
} }
private fun artemisToCordaMessage(message: ClientMessage): Message? { private fun artemisToCordaMessage(message: ClientMessage, uuid: UUID): Message? {
try { try {
if (!message.containsProperty(TOPIC_PROPERTY)) { if (!message.containsProperty(TOPIC_PROPERTY)) {
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring") log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
@ -204,7 +215,7 @@ class NodeMessagingClient(config: NodeConfiguration,
} }
val topic = message.getStringProperty(TOPIC_PROPERTY) val topic = message.getStringProperty(TOPIC_PROPERTY)
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY) val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
log.info("received message from: ${message.address} topic: $topic sessionID: $sessionID") log.info("received message from: ${message.address} topic: $topic sessionID: $sessionID uuid: $uuid")
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
@ -224,7 +235,7 @@ class NodeMessagingClient(config: NodeConfiguration,
} }
} }
private fun deliver(msg: Message): Boolean { private fun deliver(msg: Message, uuid: UUID): Boolean {
state.checkNotLocked() state.checkNotLocked()
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
// or removed whilst the filter is executing will not affect anything. // or removed whilst the filter is executing will not affect anything.
@ -238,12 +249,11 @@ class NodeMessagingClient(config: NodeConfiguration,
// This is a hack; transient messages held in memory isn't crash resistant. // This is a hack; transient messages held in memory isn't crash resistant.
// TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use. // TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use.
state.locked { state.locked {
undeliveredMessages += msg undeliveredMessages += Pair(msg, uuid)
} }
return false return false
} }
for (handler in deliverTo) {
try { try {
// This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will // This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will
// be slow, and Artemis can handle that case intelligently. We don't just invoke the handler // be slow, and Artemis can handle that case intelligently. We don't just invoke the handler
@ -258,19 +268,30 @@ class NodeMessagingClient(config: NodeConfiguration,
// interpret persistent as "server" and non-persistent as "client". // interpret persistent as "server" and non-persistent as "client".
if (persistentInbox) { if (persistentInbox) {
databaseTransaction { databaseTransaction {
handler.callback(msg, handler) callHandlers(msg, uuid, deliverTo)
} }
} else { } else {
handler.callback(msg, handler) callHandlers(msg, uuid, deliverTo)
} }
} }
} catch(e: Exception) { } catch(e: Exception) {
log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e) log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e)
} }
}
return true return true
} }
private fun callHandlers(msg: Message, uuid: UUID, deliverTo: List<Handler>) {
if (uuid in processedMessages) {
log.trace { "discard duplicate message $uuid for ${msg.topicSession}" }
return
}
for (handler in deliverTo) {
handler.callback(msg, handler)
}
// TODO We will at some point need to decide a trimming policy for the id's
processedMessages += uuid
}
override fun stop() { override fun stop() {
val running = state.locked { val running = state.locked {
// We allow stop() to be called without a run() in between, but it must have at least been started. // We allow stop() to be called without a run() in between, but it must have at least been started.
@ -311,18 +332,21 @@ class NodeMessagingClient(config: NodeConfiguration,
override fun send(message: Message, target: MessageRecipients) { override fun send(message: Message, target: MessageRecipients) {
val queueName = toQueueName(target) val queueName = toQueueName(target)
val uuid = UUID.randomUUID()
state.locked { state.locked {
val artemisMessage = session!!.createMessage(true).apply { val artemisMessage = session!!.createMessage(true).apply {
val sessionID = message.topicSession.sessionID val sessionID = message.topicSession.sessionID
putStringProperty(TOPIC_PROPERTY, message.topicSession.topic) putStringProperty(TOPIC_PROPERTY, message.topicSession.topic)
putLongProperty(SESSION_ID_PROPERTY, sessionID) putLongProperty(SESSION_ID_PROPERTY, sessionID)
writeBodyBufferBytes(message.data) writeBodyBufferBytes(message.data)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(uuid.toString()))
} }
if (knownQueues.add(queueName)) { if (knownQueues.add(queueName)) {
maybeCreateQueue(queueName) maybeCreateQueue(queueName)
} }
log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID}") log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} uuid: $uuid")
producer!!.send(queueName, artemisMessage) producer!!.send(queueName, artemisMessage)
} }
} }
@ -352,7 +376,7 @@ class NodeMessagingClient(config: NodeConfiguration,
undeliveredMessages = listOf() undeliveredMessages = listOf()
messagesToRedeliver messagesToRedeliver
} }
messagesToRedeliver.forEach { deliver(it) } messagesToRedeliver.forEach { deliver(it.first, it.second) }
return handler return handler
} }

@ -88,7 +88,7 @@ class JDBCHashSet<K : Any>(tableName: String, loadOnInit: Boolean = false) : Abs
* *
* See [AbstractJDBCHashMap] for implementation details. * See [AbstractJDBCHashMap] for implementation details.
*/ */
abstract class AbstractJDBCHashSet<K : Any, T : JDBCHashedTable>(table: T, loadOnInit: Boolean = false) : MutableSet<K>, AbstractSet<K>() { abstract class AbstractJDBCHashSet<K : Any, T : JDBCHashedTable>(protected val table: T, loadOnInit: Boolean = false) : MutableSet<K>, AbstractSet<K>() {
protected val innerMap = object : AbstractJDBCHashMap<K, Unit, T>(table, loadOnInit) { protected val innerMap = object : AbstractJDBCHashMap<K, Unit, T>(table, loadOnInit) {
override fun keyFromRow(it: ResultRow): K = this@AbstractJDBCHashSet.elementFromRow(it) override fun keyFromRow(it: ResultRow): K = this@AbstractJDBCHashSet.elementFromRow(it)
@ -104,9 +104,6 @@ abstract class AbstractJDBCHashSet<K : Any, T : JDBCHashedTable>(table: T, loadO
} }
protected val table: T
get() = innerMap.table
override fun add(element: K): Boolean { override fun add(element: K): Boolean {
if (innerMap.containsKey(element)) { if (innerMap.containsKey(element)) {
return false return false

@ -118,7 +118,7 @@ class ArtemisMessagingTests {
} }
private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient { private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient {
return NodeMessagingClient(config, server, identity.public, AffinityExecutor.SAME_THREAD).apply { return NodeMessagingClient(config, server, identity.public, AffinityExecutor.SAME_THREAD, false).apply {
configureWithDevSSLCertificate() configureWithDevSSLCertificate()
messagingClient = this messagingClient = this
} }