mirror of
https://github.com/corda/corda.git
synced 2025-05-31 14:40:52 +00:00
Track message id's to deduplicate replays. Widen the auto-acknowledgement window of Artemis back to the default.
Use synchronized wrapper over set. Drop discard message to trace level logging. Fix code layout Use lazy trace extension method
This commit is contained in:
parent
e2c793df05
commit
074964f919
@ -163,6 +163,10 @@ class ArtemisMessagingServer(config: NodeConfiguration,
|
||||
config.acceptorConfigurations = setOf(
|
||||
tcpTransport(ConnectionDirection.INBOUND, "0.0.0.0", hp.port)
|
||||
)
|
||||
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
|
||||
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
|
||||
config.idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
|
||||
config.isPersistIDCache = true
|
||||
return config
|
||||
}
|
||||
|
||||
@ -198,6 +202,7 @@ class ArtemisMessagingServer(config: NodeConfiguration,
|
||||
forwardingAddress = nameStr
|
||||
staticConnectors = listOf(hostAndPort.toString())
|
||||
confirmationWindowSize = 100000 // a guess
|
||||
isUseDuplicateDetection = true // Enable the bridges automatic deduplication logic
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -6,9 +6,11 @@ import com.r3corda.core.messaging.*
|
||||
import com.r3corda.core.serialization.SerializedBytes
|
||||
import com.r3corda.core.serialization.opaque
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import com.r3corda.node.utilities.JDBCHashSet
|
||||
import com.r3corda.node.utilities.databaseTransaction
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
@ -16,6 +18,7 @@ import org.apache.activemq.artemis.api.core.client.*
|
||||
import java.nio.file.FileSystems
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.Executor
|
||||
@ -82,7 +85,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
var rpcNotificationConsumer: ClientConsumer? = null
|
||||
|
||||
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
|
||||
var undeliveredMessages = listOf<Message>()
|
||||
var undeliveredMessages = listOf<Pair<Message, UUID>>()
|
||||
}
|
||||
|
||||
/** A registration to handle messages of different types */
|
||||
@ -97,6 +100,11 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
|
||||
private val state = ThreadBox(InnerState())
|
||||
private val handlers = CopyOnWriteArrayList<Handler>()
|
||||
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(if (persistentInbox) {
|
||||
JDBCHashSet<UUID>("message_id", loadOnInit = true)
|
||||
} else {
|
||||
HashSet<UUID>()
|
||||
})
|
||||
|
||||
init {
|
||||
require(config.basedir.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
|
||||
@ -113,8 +121,9 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport)
|
||||
clientFactory = locator.createSessionFactory()
|
||||
|
||||
// Create a session and configure to commit manually after each acknowledge. (N.B. ackBatchSize is in Bytes!!!)
|
||||
val session = clientFactory!!.createSession(true, true, 1)
|
||||
// Create a session. Note that the acknowledgement of messages is not flushed to
|
||||
// the DB until the default buffer size of 1MB is acknowledged.
|
||||
val session = clientFactory!!.createSession(true, true, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
|
||||
this.session = session
|
||||
session.start()
|
||||
|
||||
@ -170,9 +179,11 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
null
|
||||
} ?: break
|
||||
|
||||
val message: Message? = artemisToCordaMessage(artemisMessage)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
val uuid = UUID.fromString(artemisMessage.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID))
|
||||
val message: Message? = artemisToCordaMessage(artemisMessage, uuid)
|
||||
if (message != null)
|
||||
deliver(message)
|
||||
deliver(message, uuid)
|
||||
|
||||
// Ack the message so it won't be redelivered. We should only really do this when there were no
|
||||
// transient failures. If we caught an exception in the handler, we could back off and retry delivery
|
||||
@ -192,7 +203,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
shutdownLatch.countDown()
|
||||
}
|
||||
|
||||
private fun artemisToCordaMessage(message: ClientMessage): Message? {
|
||||
private fun artemisToCordaMessage(message: ClientMessage, uuid: UUID): Message? {
|
||||
try {
|
||||
if (!message.containsProperty(TOPIC_PROPERTY)) {
|
||||
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
|
||||
@ -204,7 +215,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
}
|
||||
val topic = message.getStringProperty(TOPIC_PROPERTY)
|
||||
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
|
||||
log.info("received message from: ${message.address} topic: $topic sessionID: $sessionID")
|
||||
log.info("received message from: ${message.address} topic: $topic sessionID: $sessionID uuid: $uuid")
|
||||
|
||||
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
|
||||
|
||||
@ -224,7 +235,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
private fun deliver(msg: Message): Boolean {
|
||||
private fun deliver(msg: Message, uuid: UUID): Boolean {
|
||||
state.checkNotLocked()
|
||||
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
|
||||
// or removed whilst the filter is executing will not affect anything.
|
||||
@ -238,39 +249,49 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
// This is a hack; transient messages held in memory isn't crash resistant.
|
||||
// TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use.
|
||||
state.locked {
|
||||
undeliveredMessages += msg
|
||||
undeliveredMessages += Pair(msg, uuid)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
for (handler in deliverTo) {
|
||||
try {
|
||||
// This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will
|
||||
// be slow, and Artemis can handle that case intelligently. We don't just invoke the handler
|
||||
// directly in order to ensure that we have the features of the AffinityExecutor class throughout
|
||||
// the bulk of the codebase and other non-messaging jobs can be scheduled onto the server executor
|
||||
// easily.
|
||||
//
|
||||
// Note that handlers may re-enter this class. We aren't holding any locks and methods like
|
||||
// start/run/stop have re-entrancy assertions at the top, so it is OK.
|
||||
executor.fetchFrom {
|
||||
// TODO: we should be able to clean this up if we separate client and server code, but for now
|
||||
// interpret persistent as "server" and non-persistent as "client".
|
||||
if (persistentInbox) {
|
||||
databaseTransaction {
|
||||
handler.callback(msg, handler)
|
||||
}
|
||||
} else {
|
||||
handler.callback(msg, handler)
|
||||
try {
|
||||
// This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will
|
||||
// be slow, and Artemis can handle that case intelligently. We don't just invoke the handler
|
||||
// directly in order to ensure that we have the features of the AffinityExecutor class throughout
|
||||
// the bulk of the codebase and other non-messaging jobs can be scheduled onto the server executor
|
||||
// easily.
|
||||
//
|
||||
// Note that handlers may re-enter this class. We aren't holding any locks and methods like
|
||||
// start/run/stop have re-entrancy assertions at the top, so it is OK.
|
||||
executor.fetchFrom {
|
||||
// TODO: we should be able to clean this up if we separate client and server code, but for now
|
||||
// interpret persistent as "server" and non-persistent as "client".
|
||||
if (persistentInbox) {
|
||||
databaseTransaction {
|
||||
callHandlers(msg, uuid, deliverTo)
|
||||
}
|
||||
} else {
|
||||
callHandlers(msg, uuid, deliverTo)
|
||||
}
|
||||
} catch(e: Exception) {
|
||||
log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e)
|
||||
}
|
||||
} catch(e: Exception) {
|
||||
log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
private fun callHandlers(msg: Message, uuid: UUID, deliverTo: List<Handler>) {
|
||||
if (uuid in processedMessages) {
|
||||
log.trace { "discard duplicate message $uuid for ${msg.topicSession}" }
|
||||
return
|
||||
}
|
||||
for (handler in deliverTo) {
|
||||
handler.callback(msg, handler)
|
||||
}
|
||||
// TODO We will at some point need to decide a trimming policy for the id's
|
||||
processedMessages += uuid
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
val running = state.locked {
|
||||
// We allow stop() to be called without a run() in between, but it must have at least been started.
|
||||
@ -311,18 +332,21 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
|
||||
override fun send(message: Message, target: MessageRecipients) {
|
||||
val queueName = toQueueName(target)
|
||||
val uuid = UUID.randomUUID()
|
||||
state.locked {
|
||||
val artemisMessage = session!!.createMessage(true).apply {
|
||||
val sessionID = message.topicSession.sessionID
|
||||
putStringProperty(TOPIC_PROPERTY, message.topicSession.topic)
|
||||
putLongProperty(SESSION_ID_PROPERTY, sessionID)
|
||||
writeBodyBufferBytes(message.data)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(uuid.toString()))
|
||||
}
|
||||
|
||||
if (knownQueues.add(queueName)) {
|
||||
maybeCreateQueue(queueName)
|
||||
}
|
||||
log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID}")
|
||||
log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} uuid: $uuid")
|
||||
producer!!.send(queueName, artemisMessage)
|
||||
}
|
||||
}
|
||||
@ -352,7 +376,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
undeliveredMessages = listOf()
|
||||
messagesToRedeliver
|
||||
}
|
||||
messagesToRedeliver.forEach { deliver(it) }
|
||||
messagesToRedeliver.forEach { deliver(it.first, it.second) }
|
||||
return handler
|
||||
}
|
||||
|
||||
|
@ -88,7 +88,7 @@ class JDBCHashSet<K : Any>(tableName: String, loadOnInit: Boolean = false) : Abs
|
||||
*
|
||||
* See [AbstractJDBCHashMap] for implementation details.
|
||||
*/
|
||||
abstract class AbstractJDBCHashSet<K : Any, T : JDBCHashedTable>(table: T, loadOnInit: Boolean = false) : MutableSet<K>, AbstractSet<K>() {
|
||||
abstract class AbstractJDBCHashSet<K : Any, T : JDBCHashedTable>(protected val table: T, loadOnInit: Boolean = false) : MutableSet<K>, AbstractSet<K>() {
|
||||
protected val innerMap = object : AbstractJDBCHashMap<K, Unit, T>(table, loadOnInit) {
|
||||
override fun keyFromRow(it: ResultRow): K = this@AbstractJDBCHashSet.elementFromRow(it)
|
||||
|
||||
@ -104,9 +104,6 @@ abstract class AbstractJDBCHashSet<K : Any, T : JDBCHashedTable>(table: T, loadO
|
||||
|
||||
}
|
||||
|
||||
protected val table: T
|
||||
get() = innerMap.table
|
||||
|
||||
override fun add(element: K): Boolean {
|
||||
if (innerMap.containsKey(element)) {
|
||||
return false
|
||||
|
@ -118,7 +118,7 @@ class ArtemisMessagingTests {
|
||||
}
|
||||
|
||||
private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient {
|
||||
return NodeMessagingClient(config, server, identity.public, AffinityExecutor.SAME_THREAD).apply {
|
||||
return NodeMessagingClient(config, server, identity.public, AffinityExecutor.SAME_THREAD, false).apply {
|
||||
configureWithDevSSLCertificate()
|
||||
messagingClient = this
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user