diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 20ab49da70..83cb8b44f9 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -127,7 +127,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: } val legalIdentity = obtainLegalIdentity() val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null - return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database) + return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database, networkMapRegistrationFuture) } override fun startMessagingService(rpcOps: RPCOps) { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index a033a056a8..07320bcadc 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -1,11 +1,13 @@ package net.corda.node.services.messaging import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.ListenableFuture import net.corda.core.ThreadBox import net.corda.core.crypto.PublicKeyTree import net.corda.core.messaging.* import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.opaque +import net.corda.core.success import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace import net.corda.node.services.RPCUserService @@ -51,7 +53,8 @@ class NodeMessagingClient(override val config: NodeConfiguration, val serverHostPort: HostAndPort, val myIdentity: PublicKeyTree?, val executor: AffinityExecutor, - val database: Database) : ArtemisMessagingComponent(), MessagingServiceInternal { + val database: Database, + val networkMapRegistrationFuture: ListenableFuture) : ArtemisMessagingComponent(), MessagingServiceInternal { companion object { val log = loggerFor() @@ -82,25 +85,6 @@ class NodeMessagingClient(override val config: NodeConfiguration, // Consumer for inbound client RPC messages. var rpcConsumer: ClientConsumer? = null var rpcNotificationConsumer: ClientConsumer? = null - - private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}pending_messages") { - val uuid = uuidString("message_id") - val message = blob("message") - } - - val pendingRedelivery = object : AbstractJDBCHashMap(Table, loadOnInit = false) { - override fun keyFromRow(row: ResultRow): UUID = row[table.uuid] - - override fun valueFromRow(row: ResultRow): Message = deserializeFromBlob(row[table.message]) - - override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { - insert[table.uuid] = entry.key - } - - override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { - insert[table.message] = serializeToBlob(entry.value, finalizables) - } - } } /** A registration to handle messages of different types */ @@ -155,7 +139,18 @@ class NodeMessagingClient(override val config: NodeConfiguration, session.createQueue(queueName, queueName, true) } knownQueues.add(queueName) - p2pConsumer = session.createConsumer(queueName) + p2pConsumer = makeConsumer(session, queueName, true) + networkMapRegistrationFuture.success { + state.locked { + log.info("Network map is complete, so removing filter from Artemis consumer.") + try { + p2pConsumer!!.close() + } catch(e: ActiveMQObjectClosedException) { + // Ignore it: this can happen if the server has gone away before we do. + } + p2pConsumer = makeConsumer(session, queueName, false) + } + } // Create an RPC queue and consumer: this will service locally connected clients only (not via a // bridge) and those clients must have authenticated. We could use a single consumer for everything @@ -168,10 +163,59 @@ class NodeMessagingClient(override val config: NodeConfiguration, } } + /** + * We make the consumer twice, once to filter for just network map messages, and then once that is complete, we close + * the original and make another without a filter. We do this so that there is a network map in place for all other + * message handlers. + */ + private fun makeConsumer(session: ClientSession, queueName: SimpleString, networkMapOnly: Boolean): ClientConsumer { + return if (networkMapOnly) { + // Filter for just the network map messages. + val messageFilter = SimpleString("hyphenated_props:$TOPIC_PROPERTY like 'platform.network_map.%'") + session.createConsumer(queueName, messageFilter) + } else + session.createConsumer(queueName) + } + private var shutdownLatch = CountDownLatch(1) - /** Starts the p2p event loop: this method only returns once [stop] has been called. */ - fun run() { + private fun processMessage(consumer: ClientConsumer): Boolean { + // Two possibilities here: + // + // 1. We block waiting for a message and the consumer is closed in another thread. In this case + // receive returns null and we break out of the loop. + // 2. We receive a message and process it, and stop() is called during delivery. In this case, + // calling receive will throw and we break out of the loop. + // + // It's safe to call into receive simultaneous with other threads calling send on a producer. + val artemisMessage: ClientMessage = try { + consumer.receive() + } catch(e: ActiveMQObjectClosedException) { + null + } ?: return false + + val message: Message? = artemisToCordaMessage(artemisMessage) + if (message != null) + deliver(message) + + // Ack the message so it won't be redelivered. We should only really do this when there were no + // transient failures. If we caught an exception in the handler, we could back off and retry delivery + // a few times before giving up and redirecting the message to a dead-letter address for admin or + // developer inspection. Artemis has the features to do this for us, we just need to enable them. + // + // TODO: Setup Artemis delayed redelivery and dead letter addresses. + // + // ACKing a message calls back into the session which isn't thread safe, so we have to ensure it + // doesn't collide with a send here. Note that stop() could have been called whilst we were + // processing a message but if so, it'll be parked waiting for us to count down the latch, so + // the session itself is still around and we can still ack messages as a result. + state.locked { + artemisMessage.acknowledge() + } + return true + } + + private fun runPreNetworkMap() { val consumer = state.locked { check(started) check(!running) { "run can't be called twice" } @@ -181,40 +225,32 @@ class NodeMessagingClient(override val config: NodeConfiguration, p2pConsumer!! } - while (true) { - // Two possibilities here: - // - // 1. We block waiting for a message and the consumer is closed in another thread. In this case - // receive returns null and we break out of the loop. - // 2. We receive a message and process it, and stop() is called during delivery. In this case, - // calling receive will throw and we break out of the loop. - // - // It's safe to call into receive simultaneous with other threads calling send on a producer. - val artemisMessage: ClientMessage = try { - consumer.receive() - } catch(e: ActiveMQObjectClosedException) { - null - } ?: break - - val message: Message? = artemisToCordaMessage(artemisMessage) - if (message != null) - deliver(message) - - // Ack the message so it won't be redelivered. We should only really do this when there were no - // transient failures. If we caught an exception in the handler, we could back off and retry delivery - // a few times before giving up and redirecting the message to a dead-letter address for admin or - // developer inspection. Artemis has the features to do this for us, we just need to enable them. - // - // TODO: Setup Artemis delayed redelivery and dead letter addresses. - // - // ACKing a message calls back into the session which isn't thread safe, so we have to ensure it - // doesn't collide with a send here. Note that stop() could have been called whilst we were - // processing a message but if so, it'll be parked waiting for us to count down the latch, so - // the session itself is still around and we can still ack messages as a result. - state.locked { - artemisMessage.acknowledge() - } + while (!networkMapRegistrationFuture.isDone && processMessage(consumer)) { } + } + + private fun runPostNetworkMap() { + val consumer = state.locked { + // If it's null, it means we already called stop, so return immediately. + p2pConsumer ?: return + } + + while (processMessage(consumer)) { + } + } + + /** + * Starts the p2p event loop: this method only returns once [stop] has been called. + * + * This actually runs as two sequential loops. The first subscribes for and receives only network map messages until + * we get our network map fetch response. At that point the filtering consumer is closed and we proceed to the second loop and + * consume all messages via a new consumer without a filter applied. + */ + fun run() { + // Build the network map. + runPreNetworkMap() + // Process everything else once we have the network map. + runPostNetworkMap() shutdownLatch.countDown() } @@ -252,25 +288,11 @@ class NodeMessagingClient(override val config: NodeConfiguration, } } - private fun deliver(msg: Message, redelivery: Boolean = false): Boolean { + private fun deliver(msg: Message): Boolean { state.checkNotLocked() // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added // or removed whilst the filter is executing will not affect anything. val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession } - - if (deliverTo.isEmpty() && !redelivery) { - // This should probably be downgraded to a trace in future, so the protocol can evolve with new topics - // without causing log spam. - log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet") - - state.locked { - databaseTransaction(database) { - pendingRedelivery[msg.uniqueMessageId] = msg - } - } - return false - } - try { // This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will // be slow, and Artemis can handle that case intelligently. We don't just invoke the handler @@ -282,7 +304,18 @@ class NodeMessagingClient(override val config: NodeConfiguration, // start/run/stop have re-entrancy assertions at the top, so it is OK. executor.fetchFrom { databaseTransaction(database) { - callHandlers(msg, deliverTo, redelivery) + if (msg.uniqueMessageId in processedMessages) { + log.trace { "Discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" } + } else { + if (deliverTo.isEmpty()) { + // TODO: Implement dead letter queue, and send it there. + log.warn("Received message ${msg.uniqueMessageId} for ${msg.topicSession} that doesn't have any registered handlers yet") + } else { + callHandlers(msg, deliverTo) + } + // TODO We will at some point need to decide a trimming policy for the id's + processedMessages += msg.uniqueMessageId + } } } } catch(e: Exception) { @@ -291,26 +324,18 @@ class NodeMessagingClient(override val config: NodeConfiguration, return true } - private fun callHandlers(msg: Message, deliverTo: List, redelivery: Boolean) { - if (msg.uniqueMessageId in processedMessages) { - log.trace { "discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" } - return - } + private fun callHandlers(msg: Message, deliverTo: List) { for (handler in deliverTo) { handler.callback(msg, handler) } - // TODO We will at some point need to decide a trimming policy for the id's - processedMessages += msg.uniqueMessageId - if (redelivery) state.locked { - pendingRedelivery.remove(msg.uniqueMessageId) - } } override fun stop() { val running = state.locked { // We allow stop() to be called without a run() in between, but it must have at least been started. check(started) - + val prevRunning = running + running = false val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice") try { c.close() @@ -318,8 +343,6 @@ class NodeMessagingClient(override val config: NodeConfiguration, // Ignore it: this can happen if the server has gone away before we do. } p2pConsumer = null - val prevRunning = running - running = false prevRunning } if (running && !executor.isOnThread) { @@ -382,14 +405,6 @@ class NodeMessagingClient(override val config: NodeConfiguration, require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." } val handler = Handler(topicSession, callback) handlers.add(handler) - val messagesToRedeliver = state.locked { - val pending = ArrayList() - databaseTransaction(database) { - pending.addAll(pendingRedelivery.values) - } - pending - } - messagesToRedeliver.forEach { deliver(it, true) } return handler } diff --git a/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt index e60e99e084..b053990d6a 100644 --- a/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt @@ -1,6 +1,10 @@ package net.corda.node.services import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.Futures +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture +import com.typesafe.config.ConfigFactory import net.corda.core.crypto.generateKeyPair import net.corda.core.crypto.tree import net.corda.core.messaging.Message @@ -12,13 +16,13 @@ import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.NodeMessagingClient import net.corda.node.services.messaging.RPCOps import net.corda.node.services.network.InMemoryNetworkMapCache +import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.databaseTransaction import net.corda.testing.freeLocalHostAndPort import net.corda.testing.node.makeTestDataSourceProperties -import com.typesafe.config.ConfigFactory import org.assertj.core.api.Assertions.assertThatThrownBy import org.jetbrains.exposed.sql.Database import org.junit.After @@ -34,6 +38,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import kotlin.concurrent.thread import kotlin.test.assertEquals import kotlin.test.assertNull +import kotlin.test.assertTrue class ArtemisMessagingTests { @Rule @JvmField val temporaryFolder = TemporaryFolder() @@ -46,7 +51,7 @@ class ArtemisMessagingTests { lateinit var dataSource: Closeable lateinit var database: Database lateinit var userService: RPCUserService - + lateinit var networkMapRegistrationFuture: ListenableFuture var messagingClient: NodeMessagingClient? = null var messagingServer: ArtemisMessagingServer? = null @@ -76,6 +81,7 @@ class ArtemisMessagingTests { val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties()) dataSource = dataSourceAndDatabase.first database = dataSourceAndDatabase.second + networkMapRegistrationFuture = Futures.immediateFuture(Unit) } @After @@ -99,7 +105,8 @@ class ArtemisMessagingTests { val remoteServerAddress = freeLocalHostAndPort() createMessagingServer(remoteServerAddress).start() - createMessagingClient(server = remoteServerAddress).start(rpcOps, userService) + createMessagingClient(server = remoteServerAddress) + startNodeMessagingClient() } @Test @@ -110,30 +117,22 @@ class ArtemisMessagingTests { createMessagingServer(serverAddress).start() messagingClient = createMessagingClient(server = invalidServerAddress) - assertThatThrownBy { messagingClient!!.start(rpcOps, userService) } + assertThatThrownBy { startNodeMessagingClient() } messagingClient = null } @Test fun `client should connect to local server`() { createMessagingServer().start() - createMessagingClient().start(rpcOps, userService) + createMessagingClient() + startNodeMessagingClient() } @Test fun `client should be able to send message to itself`() { val receivedMessages = LinkedBlockingQueue() - createMessagingServer().start() - - val messagingClient = createMessagingClient() - messagingClient.start(rpcOps, userService) - thread { messagingClient.run() } - - messagingClient.addMessageHandler(topic) { message, r -> - receivedMessages.add(message) - } - + val messagingClient = createAndStartClientAndServer(receivedMessages) val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray()) messagingClient.send(message, messagingClient.myAddress) @@ -142,9 +141,88 @@ class ArtemisMessagingTests { assertNull(receivedMessages.poll(200, MILLISECONDS)) } + @Test + fun `client should be able to send message to itself before network map is available, and receive after`() { + val settableFuture: SettableFuture = SettableFuture.create() + networkMapRegistrationFuture = settableFuture + + val receivedMessages = LinkedBlockingQueue() + + val messagingClient = createAndStartClientAndServer(receivedMessages) + val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray()) + messagingClient.send(message, messagingClient.myAddress) + + val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray()) + messagingClient.send(networkMapMessage, messagingClient.myAddress) + + val actual: Message = receivedMessages.take() + assertEquals("second msg", String(actual.data)) + assertNull(receivedMessages.poll(200, MILLISECONDS)) + settableFuture.set(Unit) + val firstActual: Message = receivedMessages.take() + assertEquals("first msg", String(firstActual.data)) + assertNull(receivedMessages.poll(200, MILLISECONDS)) + } + + @Test + fun `client should be able to send large numbers of messages to itself before network map is available and survive restart, then receive messages`() { + // Crank the iteration up as high as you want... just takes longer to run. + val iterations = 100 + val settableFuture: SettableFuture = SettableFuture.create() + networkMapRegistrationFuture = settableFuture + + val receivedMessages = LinkedBlockingQueue() + + val messagingClient = createAndStartClientAndServer(receivedMessages) + for (iter in 1..iterations) { + val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg $iter".toByteArray()) + messagingClient.send(message, messagingClient.myAddress) + } + + val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray()) + messagingClient.send(networkMapMessage, messagingClient.myAddress) + + val actual: Message = receivedMessages.take() + assertEquals("second msg", String(actual.data)) + assertNull(receivedMessages.poll(200, MILLISECONDS)) + + // Stop client and server and create afresh. + messagingClient.stop() + messagingServer?.stop() + + networkMapRegistrationFuture = Futures.immediateFuture(Unit) + + createAndStartClientAndServer(receivedMessages) + for (iter in 1..iterations) { + val firstActual: Message = receivedMessages.take() + assertTrue(String(firstActual.data).equals("first msg $iter")) + } + assertNull(receivedMessages.poll(200, MILLISECONDS)) + } + + private fun startNodeMessagingClient() { + messagingClient!!.start(rpcOps, userService) + } + + private fun createAndStartClientAndServer(receivedMessages: LinkedBlockingQueue): NodeMessagingClient { + createMessagingServer().start() + + val messagingClient = createMessagingClient() + startNodeMessagingClient() + messagingClient.addMessageHandler(topic) { message, r -> + receivedMessages.add(message) + } + messagingClient.addMessageHandler(NetworkMapService.FETCH_PROTOCOL_TOPIC) { message, r -> + receivedMessages.add(message) + } + // Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered. + thread { messagingClient.run() } + return messagingClient + } + private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient { return databaseTransaction(database) { - NodeMessagingClient(config, server, identity.public.tree, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), database).apply { + NodeMessagingClient(config, server, identity.public.tree, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), database, networkMapRegistrationFuture).apply { configureWithDevSSLCertificate() messagingClient = this }