mirror of
synced 2025-03-21 19:45:21 +00:00
Network map message filtering. Removal of pending messages. Needs cleaning up.
This commit is contained in:
@ -127,7 +127,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
val legalIdentity = obtainLegalIdentity()
val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database)
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database, networkMapRegistrationFuture)
override fun startMessagingService(rpcOps: RPCOps) {
@ -1,11 +1,13 @@
package net.corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.ThreadBox
import net.corda.core.crypto.PublicKeyTree
import net.corda.core.messaging.*
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.opaque
import net.corda.core.success
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.node.services.RPCUserService
@ -51,7 +53,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
val serverHostPort: HostAndPort,
val myIdentity: PublicKeyTree?,
val executor: AffinityExecutor,
val database: Database) : ArtemisMessagingComponent(), MessagingServiceInternal {
val database: Database,
val networkMapRegistrationFuture: ListenableFuture<Unit>) : ArtemisMessagingComponent(), MessagingServiceInternal {
companion object {
val log = loggerFor<NodeMessagingClient>()
@ -82,25 +85,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// Consumer for inbound client RPC messages.
var rpcConsumer: ClientConsumer? = null
var rpcNotificationConsumer: ClientConsumer? = null
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}pending_messages") {
val uuid = uuidString("message_id")
val message = blob("message")
val pendingRedelivery = object : AbstractJDBCHashMap<UUID, Message, Table>(Table, loadOnInit = false) {
override fun keyFromRow(row: ResultRow): UUID = row[table.uuid]
override fun valueFromRow(row: ResultRow): Message = deserializeFromBlob(row[table.message])
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<UUID, Message>, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry.key
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<UUID, Message>, finalizables: MutableList<() -> Unit>) {
insert[table.message] = serializeToBlob(entry.value, finalizables)
/** A registration to handle messages of different types */
@ -155,7 +139,18 @@ class NodeMessagingClient(override val config: NodeConfiguration,
session.createQueue(queueName, queueName, true)
p2pConsumer = session.createConsumer(queueName)
p2pConsumer = makeConsumer(session, queueName, true)
networkMapRegistrationFuture.success {
state.locked {
log.info("Network map is complete, so removing filter from Artemis consumer.")
try {
} catch(e: ActiveMQObjectClosedException) {
// Ignore it: this can happen if the server has gone away before we do.
p2pConsumer = makeConsumer(session, queueName, false)
// Create an RPC queue and consumer: this will service locally connected clients only (not via a
// bridge) and those clients must have authenticated. We could use a single consumer for everything
@ -168,10 +163,59 @@ class NodeMessagingClient(override val config: NodeConfiguration,
* We make the consumer twice, once to filter for just network map messages, and then once that is complete, we close
* the original and make another without a filter. We do this so that there is a network map in place for all other
* message handlers.
private fun makeConsumer(session: ClientSession, queueName: SimpleString, networkMapOnly: Boolean): ClientConsumer {
return if (networkMapOnly) {
// Filter for just the network map messages.
val messageFilter = SimpleString("hyphenated_props:$TOPIC_PROPERTY like 'platform.network_map.%'")
session.createConsumer(queueName, messageFilter)
} else
private var shutdownLatch = CountDownLatch(1)
/** Starts the p2p event loop: this method only returns once [stop] has been called. */
fun run() {
private fun processMessage(consumer: ClientConsumer): Boolean {
// Two possibilities here:
// 1. We block waiting for a message and the consumer is closed in another thread. In this case
// receive returns null and we break out of the loop.
// 2. We receive a message and process it, and stop() is called during delivery. In this case,
// calling receive will throw and we break out of the loop.
// It's safe to call into receive simultaneous with other threads calling send on a producer.
val artemisMessage: ClientMessage = try {
} catch(e: ActiveMQObjectClosedException) {
} ?: return false
val message: Message? = artemisToCordaMessage(artemisMessage)
if (message != null)
// Ack the message so it won't be redelivered. We should only really do this when there were no
// transient failures. If we caught an exception in the handler, we could back off and retry delivery
// a few times before giving up and redirecting the message to a dead-letter address for admin or
// developer inspection. Artemis has the features to do this for us, we just need to enable them.
// TODO: Setup Artemis delayed redelivery and dead letter addresses.
// ACKing a message calls back into the session which isn't thread safe, so we have to ensure it
// doesn't collide with a send here. Note that stop() could have been called whilst we were
// processing a message but if so, it'll be parked waiting for us to count down the latch, so
// the session itself is still around and we can still ack messages as a result.
state.locked {
return true
private fun runPreNetworkMap() {
val consumer = state.locked {
check(!running) { "run can't be called twice" }
@ -181,40 +225,32 @@ class NodeMessagingClient(override val config: NodeConfiguration,
while (true) {
// Two possibilities here:
// 1. We block waiting for a message and the consumer is closed in another thread. In this case
// receive returns null and we break out of the loop.
// 2. We receive a message and process it, and stop() is called during delivery. In this case,
// calling receive will throw and we break out of the loop.
// It's safe to call into receive simultaneous with other threads calling send on a producer.
val artemisMessage: ClientMessage = try {
} catch(e: ActiveMQObjectClosedException) {
} ?: break
val message: Message? = artemisToCordaMessage(artemisMessage)
if (message != null)
// Ack the message so it won't be redelivered. We should only really do this when there were no
// transient failures. If we caught an exception in the handler, we could back off and retry delivery
// a few times before giving up and redirecting the message to a dead-letter address for admin or
// developer inspection. Artemis has the features to do this for us, we just need to enable them.
// TODO: Setup Artemis delayed redelivery and dead letter addresses.
// ACKing a message calls back into the session which isn't thread safe, so we have to ensure it
// doesn't collide with a send here. Note that stop() could have been called whilst we were
// processing a message but if so, it'll be parked waiting for us to count down the latch, so
// the session itself is still around and we can still ack messages as a result.
state.locked {
while (!networkMapRegistrationFuture.isDone && processMessage(consumer)) {
private fun runPostNetworkMap() {
val consumer = state.locked {
// If it's null, it means we already called stop, so return immediately.
p2pConsumer ?: return
while (processMessage(consumer)) {
* Starts the p2p event loop: this method only returns once [stop] has been called.
* This actually runs as two sequential loops. The first subscribes for and receives only network map messages until
* we get our network map fetch response. At that point the filtering consumer is closed and we proceed to the second loop and
* consume all messages via a new consumer without a filter applied.
fun run() {
// Build the network map.
// Process everything else once we have the network map.
@ -252,25 +288,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
private fun deliver(msg: Message, redelivery: Boolean = false): Boolean {
private fun deliver(msg: Message): Boolean {
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
// or removed whilst the filter is executing will not affect anything.
val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession }
if (deliverTo.isEmpty() && !redelivery) {
// This should probably be downgraded to a trace in future, so the protocol can evolve with new topics
// without causing log spam.
log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet")
state.locked {
databaseTransaction(database) {
pendingRedelivery[msg.uniqueMessageId] = msg
return false
try {
// This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will
// be slow, and Artemis can handle that case intelligently. We don't just invoke the handler
@ -282,7 +304,18 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// start/run/stop have re-entrancy assertions at the top, so it is OK.
executor.fetchFrom {
databaseTransaction(database) {
callHandlers(msg, deliverTo, redelivery)
if (msg.uniqueMessageId in processedMessages) {
log.trace { "Discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" }
} else {
if (deliverTo.isEmpty()) {
// TODO: Implement dead letter queue, and send it there.
log.warn("Received message ${msg.uniqueMessageId} for ${msg.topicSession} that doesn't have any registered handlers yet")
} else {
callHandlers(msg, deliverTo)
// TODO We will at some point need to decide a trimming policy for the id's
processedMessages += msg.uniqueMessageId
} catch(e: Exception) {
@ -291,26 +324,18 @@ class NodeMessagingClient(override val config: NodeConfiguration,
return true
private fun callHandlers(msg: Message, deliverTo: List<Handler>, redelivery: Boolean) {
if (msg.uniqueMessageId in processedMessages) {
log.trace { "discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" }
private fun callHandlers(msg: Message, deliverTo: List<Handler>) {
for (handler in deliverTo) {
handler.callback(msg, handler)
// TODO We will at some point need to decide a trimming policy for the id's
processedMessages += msg.uniqueMessageId
if (redelivery) state.locked {
override fun stop() {
val running = state.locked {
// We allow stop() to be called without a run() in between, but it must have at least been started.
val prevRunning = running
running = false
val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice")
try {
@ -318,8 +343,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// Ignore it: this can happen if the server has gone away before we do.
p2pConsumer = null
val prevRunning = running
running = false
if (running && !executor.isOnThread) {
@ -382,14 +405,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." }
val handler = Handler(topicSession, callback)
val messagesToRedeliver = state.locked {
val pending = ArrayList<Message>()
databaseTransaction(database) {
messagesToRedeliver.forEach { deliver(it, true) }
return handler
@ -1,6 +1,10 @@
package net.corda.node.services
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.typesafe.config.ConfigFactory
import net.corda.core.crypto.generateKeyPair
import net.corda.core.crypto.tree
import net.corda.core.messaging.Message
@ -12,13 +16,13 @@ import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.messaging.RPCOps
import net.corda.node.services.network.InMemoryNetworkMapCache
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.configureDatabase
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.freeLocalHostAndPort
import net.corda.testing.node.makeTestDataSourceProperties
import com.typesafe.config.ConfigFactory
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.jetbrains.exposed.sql.Database
import org.junit.After
@ -34,6 +38,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue
class ArtemisMessagingTests {
@Rule @JvmField val temporaryFolder = TemporaryFolder()
@ -46,7 +51,7 @@ class ArtemisMessagingTests {
lateinit var dataSource: Closeable
lateinit var database: Database
lateinit var userService: RPCUserService
lateinit var networkMapRegistrationFuture: ListenableFuture<Unit>
var messagingClient: NodeMessagingClient? = null
var messagingServer: ArtemisMessagingServer? = null
@ -76,6 +81,7 @@ class ArtemisMessagingTests {
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
dataSource = dataSourceAndDatabase.first
database = dataSourceAndDatabase.second
networkMapRegistrationFuture = Futures.immediateFuture(Unit)
@ -99,7 +105,8 @@ class ArtemisMessagingTests {
val remoteServerAddress = freeLocalHostAndPort()
createMessagingClient(server = remoteServerAddress).start(rpcOps, userService)
createMessagingClient(server = remoteServerAddress)
@ -110,30 +117,22 @@ class ArtemisMessagingTests {
messagingClient = createMessagingClient(server = invalidServerAddress)
assertThatThrownBy { messagingClient!!.start(rpcOps, userService) }
assertThatThrownBy { startNodeMessagingClient() }
messagingClient = null
fun `client should connect to local server`() {
createMessagingClient().start(rpcOps, userService)
fun `client should be able to send message to itself`() {
val receivedMessages = LinkedBlockingQueue<Message>()
val messagingClient = createMessagingClient()
messagingClient.start(rpcOps, userService)
thread { messagingClient.run() }
messagingClient.addMessageHandler(topic) { message, r ->
val messagingClient = createAndStartClientAndServer(receivedMessages)
val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray())
messagingClient.send(message, messagingClient.myAddress)
@ -142,9 +141,88 @@ class ArtemisMessagingTests {
assertNull(receivedMessages.poll(200, MILLISECONDS))
fun `client should be able to send message to itself before network map is available, and receive after`() {
val settableFuture: SettableFuture<Unit> = SettableFuture.create()
networkMapRegistrationFuture = settableFuture
val receivedMessages = LinkedBlockingQueue<Message>()
val messagingClient = createAndStartClientAndServer(receivedMessages)
val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray())
messagingClient.send(message, messagingClient.myAddress)
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray())
messagingClient.send(networkMapMessage, messagingClient.myAddress)
val actual: Message = receivedMessages.take()
assertEquals("second msg", String(actual.data))
assertNull(receivedMessages.poll(200, MILLISECONDS))
val firstActual: Message = receivedMessages.take()
assertEquals("first msg", String(firstActual.data))
assertNull(receivedMessages.poll(200, MILLISECONDS))
fun `client should be able to send large numbers of messages to itself before network map is available and survive restart, then receive messages`() {
// Crank the iteration up as high as you want... just takes longer to run.
val iterations = 100
val settableFuture: SettableFuture<Unit> = SettableFuture.create()
networkMapRegistrationFuture = settableFuture
val receivedMessages = LinkedBlockingQueue<Message>()
val messagingClient = createAndStartClientAndServer(receivedMessages)
for (iter in 1..iterations) {
val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg $iter".toByteArray())
messagingClient.send(message, messagingClient.myAddress)
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray())
messagingClient.send(networkMapMessage, messagingClient.myAddress)
val actual: Message = receivedMessages.take()
assertEquals("second msg", String(actual.data))
assertNull(receivedMessages.poll(200, MILLISECONDS))
// Stop client and server and create afresh.
networkMapRegistrationFuture = Futures.immediateFuture(Unit)
for (iter in 1..iterations) {
val firstActual: Message = receivedMessages.take()
assertTrue(String(firstActual.data).equals("first msg $iter"))
assertNull(receivedMessages.poll(200, MILLISECONDS))
private fun startNodeMessagingClient() {
messagingClient!!.start(rpcOps, userService)
private fun createAndStartClientAndServer(receivedMessages: LinkedBlockingQueue<Message>): NodeMessagingClient {
val messagingClient = createMessagingClient()
messagingClient.addMessageHandler(topic) { message, r ->
messagingClient.addMessageHandler(NetworkMapService.FETCH_PROTOCOL_TOPIC) { message, r ->
// Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered.
thread { messagingClient.run() }
return messagingClient
private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient {
return databaseTransaction(database) {
NodeMessagingClient(config, server, identity.public.tree, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), database).apply {
NodeMessagingClient(config, server, identity.public.tree, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), database, networkMapRegistrationFuture).apply {
messagingClient = this
Reference in New Issue
Block a user