Merged in parkri-pending-messages (pull request #481)

Remove need for pending messages map/table and fix start up race between network map and protocols wanting to look up Party
This commit is contained in:
Rick Parker 2016-11-17 17:12:35 +00:00
commit 0c9b03411a
3 changed files with 203 additions and 110 deletions

View File

@ -127,7 +127,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
} }
val legalIdentity = obtainLegalIdentity() val legalIdentity = obtainLegalIdentity()
val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database) return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database, networkMapRegistrationFuture)
} }
override fun startMessagingService(rpcOps: RPCOps) { override fun startMessagingService(rpcOps: RPCOps) {

View File

@ -1,11 +1,13 @@
package net.corda.node.services.messaging package net.corda.node.services.messaging
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.ThreadBox import net.corda.core.ThreadBox
import net.corda.core.crypto.PublicKeyTree import net.corda.core.crypto.PublicKeyTree
import net.corda.core.messaging.* import net.corda.core.messaging.*
import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.opaque import net.corda.core.serialization.opaque
import net.corda.core.success
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace import net.corda.core.utilities.trace
import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserService
@ -51,7 +53,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
val serverHostPort: HostAndPort, val serverHostPort: HostAndPort,
val myIdentity: PublicKeyTree?, val myIdentity: PublicKeyTree?,
val executor: AffinityExecutor, val executor: AffinityExecutor,
val database: Database) : ArtemisMessagingComponent(), MessagingServiceInternal { val database: Database,
val networkMapRegistrationFuture: ListenableFuture<Unit>) : ArtemisMessagingComponent(), MessagingServiceInternal {
companion object { companion object {
val log = loggerFor<NodeMessagingClient>() val log = loggerFor<NodeMessagingClient>()
@ -82,25 +85,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// Consumer for inbound client RPC messages. // Consumer for inbound client RPC messages.
var rpcConsumer: ClientConsumer? = null var rpcConsumer: ClientConsumer? = null
var rpcNotificationConsumer: ClientConsumer? = null var rpcNotificationConsumer: ClientConsumer? = null
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}pending_messages") {
val uuid = uuidString("message_id")
val message = blob("message")
}
val pendingRedelivery = object : AbstractJDBCHashMap<UUID, Message, Table>(Table, loadOnInit = false) {
override fun keyFromRow(row: ResultRow): UUID = row[table.uuid]
override fun valueFromRow(row: ResultRow): Message = deserializeFromBlob(row[table.message])
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<UUID, Message>, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry.key
}
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<UUID, Message>, finalizables: MutableList<() -> Unit>) {
insert[table.message] = serializeToBlob(entry.value, finalizables)
}
}
} }
/** A registration to handle messages of different types */ /** A registration to handle messages of different types */
@ -155,7 +139,18 @@ class NodeMessagingClient(override val config: NodeConfiguration,
session.createQueue(queueName, queueName, true) session.createQueue(queueName, queueName, true)
} }
knownQueues.add(queueName) knownQueues.add(queueName)
p2pConsumer = session.createConsumer(queueName) p2pConsumer = makeConsumer(session, queueName, true)
networkMapRegistrationFuture.success {
state.locked {
log.info("Network map is complete, so removing filter from Artemis consumer.")
try {
p2pConsumer!!.close()
} catch(e: ActiveMQObjectClosedException) {
// Ignore it: this can happen if the server has gone away before we do.
}
p2pConsumer = makeConsumer(session, queueName, false)
}
}
// Create an RPC queue and consumer: this will service locally connected clients only (not via a // Create an RPC queue and consumer: this will service locally connected clients only (not via a
// bridge) and those clients must have authenticated. We could use a single consumer for everything // bridge) and those clients must have authenticated. We could use a single consumer for everything
@ -168,20 +163,23 @@ class NodeMessagingClient(override val config: NodeConfiguration,
} }
} }
private var shutdownLatch = CountDownLatch(1) /**
* We make the consumer twice, once to filter for just network map messages, and then once that is complete, we close
/** Starts the p2p event loop: this method only returns once [stop] has been called. */ * the original and make another without a filter. We do this so that there is a network map in place for all other
fun run() { * message handlers.
val consumer = state.locked { */
check(started) private fun makeConsumer(session: ClientSession, queueName: SimpleString, networkMapOnly: Boolean): ClientConsumer {
check(!running) { "run can't be called twice" } return if (networkMapOnly) {
running = true // Filter for just the network map messages.
// Optionally, start RPC dispatch. val messageFilter = SimpleString("hyphenated_props:$TOPIC_PROPERTY like 'platform.network_map.%'")
dispatcher?.start(rpcConsumer!!, rpcNotificationConsumer!!, executor) session.createConsumer(queueName, messageFilter)
p2pConsumer!! } else
session.createConsumer(queueName)
} }
while (true) { private var shutdownLatch = CountDownLatch(1)
private fun processMessage(consumer: ClientConsumer): Boolean {
// Two possibilities here: // Two possibilities here:
// //
// 1. We block waiting for a message and the consumer is closed in another thread. In this case // 1. We block waiting for a message and the consumer is closed in another thread. In this case
@ -194,7 +192,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
consumer.receive() consumer.receive()
} catch(e: ActiveMQObjectClosedException) { } catch(e: ActiveMQObjectClosedException) {
null null
} ?: break } ?: return false
val message: Message? = artemisToCordaMessage(artemisMessage) val message: Message? = artemisToCordaMessage(artemisMessage)
if (message != null) if (message != null)
@ -214,7 +212,45 @@ class NodeMessagingClient(override val config: NodeConfiguration,
state.locked { state.locked {
artemisMessage.acknowledge() artemisMessage.acknowledge()
} }
return true
} }
private fun runPreNetworkMap() {
val consumer = state.locked {
check(started)
check(!running) { "run can't be called twice" }
running = true
// Optionally, start RPC dispatch.
dispatcher?.start(rpcConsumer!!, rpcNotificationConsumer!!, executor)
p2pConsumer!!
}
while (!networkMapRegistrationFuture.isDone && processMessage(consumer)) {
}
}
private fun runPostNetworkMap() {
val consumer = state.locked {
// If it's null, it means we already called stop, so return immediately.
p2pConsumer ?: return
}
while (processMessage(consumer)) {
}
}
/**
* Starts the p2p event loop: this method only returns once [stop] has been called.
*
* This actually runs as two sequential loops. The first subscribes for and receives only network map messages until
* we get our network map fetch response. At that point the filtering consumer is closed and we proceed to the second loop and
* consume all messages via a new consumer without a filter applied.
*/
fun run() {
// Build the network map.
runPreNetworkMap()
// Process everything else once we have the network map.
runPostNetworkMap()
shutdownLatch.countDown() shutdownLatch.countDown()
} }
@ -252,25 +288,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
} }
} }
private fun deliver(msg: Message, redelivery: Boolean = false): Boolean { private fun deliver(msg: Message): Boolean {
state.checkNotLocked() state.checkNotLocked()
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
// or removed whilst the filter is executing will not affect anything. // or removed whilst the filter is executing will not affect anything.
val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession } val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession }
if (deliverTo.isEmpty() && !redelivery) {
// This should probably be downgraded to a trace in future, so the protocol can evolve with new topics
// without causing log spam.
log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet")
state.locked {
databaseTransaction(database) {
pendingRedelivery[msg.uniqueMessageId] = msg
}
}
return false
}
try { try {
// This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will // This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will
// be slow, and Artemis can handle that case intelligently. We don't just invoke the handler // be slow, and Artemis can handle that case intelligently. We don't just invoke the handler
@ -282,7 +304,18 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// start/run/stop have re-entrancy assertions at the top, so it is OK. // start/run/stop have re-entrancy assertions at the top, so it is OK.
executor.fetchFrom { executor.fetchFrom {
databaseTransaction(database) { databaseTransaction(database) {
callHandlers(msg, deliverTo, redelivery) if (msg.uniqueMessageId in processedMessages) {
log.trace { "Discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" }
} else {
if (deliverTo.isEmpty()) {
// TODO: Implement dead letter queue, and send it there.
log.warn("Received message ${msg.uniqueMessageId} for ${msg.topicSession} that doesn't have any registered handlers yet")
} else {
callHandlers(msg, deliverTo)
}
// TODO We will at some point need to decide a trimming policy for the id's
processedMessages += msg.uniqueMessageId
}
} }
} }
} catch(e: Exception) { } catch(e: Exception) {
@ -291,26 +324,18 @@ class NodeMessagingClient(override val config: NodeConfiguration,
return true return true
} }
private fun callHandlers(msg: Message, deliverTo: List<Handler>, redelivery: Boolean) { private fun callHandlers(msg: Message, deliverTo: List<Handler>) {
if (msg.uniqueMessageId in processedMessages) {
log.trace { "discard duplicate message ${msg.uniqueMessageId} for ${msg.topicSession}" }
return
}
for (handler in deliverTo) { for (handler in deliverTo) {
handler.callback(msg, handler) handler.callback(msg, handler)
} }
// TODO We will at some point need to decide a trimming policy for the id's
processedMessages += msg.uniqueMessageId
if (redelivery) state.locked {
pendingRedelivery.remove(msg.uniqueMessageId)
}
} }
override fun stop() { override fun stop() {
val running = state.locked { val running = state.locked {
// We allow stop() to be called without a run() in between, but it must have at least been started. // We allow stop() to be called without a run() in between, but it must have at least been started.
check(started) check(started)
val prevRunning = running
running = false
val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice") val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice")
try { try {
c.close() c.close()
@ -318,8 +343,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// Ignore it: this can happen if the server has gone away before we do. // Ignore it: this can happen if the server has gone away before we do.
} }
p2pConsumer = null p2pConsumer = null
val prevRunning = running
running = false
prevRunning prevRunning
} }
if (running && !executor.isOnThread) { if (running && !executor.isOnThread) {
@ -382,14 +405,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." } require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." }
val handler = Handler(topicSession, callback) val handler = Handler(topicSession, callback)
handlers.add(handler) handlers.add(handler)
val messagesToRedeliver = state.locked {
val pending = ArrayList<Message>()
databaseTransaction(database) {
pending.addAll(pendingRedelivery.values)
}
pending
}
messagesToRedeliver.forEach { deliver(it, true) }
return handler return handler
} }

View File

@ -1,6 +1,10 @@
package net.corda.node.services package net.corda.node.services
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.typesafe.config.ConfigFactory
import net.corda.core.crypto.generateKeyPair import net.corda.core.crypto.generateKeyPair
import net.corda.core.crypto.tree import net.corda.core.crypto.tree
import net.corda.core.messaging.Message import net.corda.core.messaging.Message
@ -12,13 +16,13 @@ import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.NodeMessagingClient import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.messaging.RPCOps import net.corda.node.services.messaging.RPCOps
import net.corda.node.services.network.InMemoryNetworkMapCache import net.corda.node.services.network.InMemoryNetworkMapCache
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.configureDatabase
import net.corda.node.utilities.databaseTransaction import net.corda.node.utilities.databaseTransaction
import net.corda.testing.freeLocalHostAndPort import net.corda.testing.freeLocalHostAndPort
import net.corda.testing.node.makeTestDataSourceProperties import net.corda.testing.node.makeTestDataSourceProperties
import com.typesafe.config.ConfigFactory
import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.Assertions.assertThatThrownBy
import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Database
import org.junit.After import org.junit.After
@ -34,6 +38,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import kotlin.concurrent.thread import kotlin.concurrent.thread
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertNull import kotlin.test.assertNull
import kotlin.test.assertTrue
class ArtemisMessagingTests { class ArtemisMessagingTests {
@Rule @JvmField val temporaryFolder = TemporaryFolder() @Rule @JvmField val temporaryFolder = TemporaryFolder()
@ -46,7 +51,7 @@ class ArtemisMessagingTests {
lateinit var dataSource: Closeable lateinit var dataSource: Closeable
lateinit var database: Database lateinit var database: Database
lateinit var userService: RPCUserService lateinit var userService: RPCUserService
lateinit var networkMapRegistrationFuture: ListenableFuture<Unit>
var messagingClient: NodeMessagingClient? = null var messagingClient: NodeMessagingClient? = null
var messagingServer: ArtemisMessagingServer? = null var messagingServer: ArtemisMessagingServer? = null
@ -76,6 +81,7 @@ class ArtemisMessagingTests {
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties()) val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
dataSource = dataSourceAndDatabase.first dataSource = dataSourceAndDatabase.first
database = dataSourceAndDatabase.second database = dataSourceAndDatabase.second
networkMapRegistrationFuture = Futures.immediateFuture(Unit)
} }
@After @After
@ -99,7 +105,8 @@ class ArtemisMessagingTests {
val remoteServerAddress = freeLocalHostAndPort() val remoteServerAddress = freeLocalHostAndPort()
createMessagingServer(remoteServerAddress).start() createMessagingServer(remoteServerAddress).start()
createMessagingClient(server = remoteServerAddress).start(rpcOps, userService) createMessagingClient(server = remoteServerAddress)
startNodeMessagingClient()
} }
@Test @Test
@ -110,30 +117,22 @@ class ArtemisMessagingTests {
createMessagingServer(serverAddress).start() createMessagingServer(serverAddress).start()
messagingClient = createMessagingClient(server = invalidServerAddress) messagingClient = createMessagingClient(server = invalidServerAddress)
assertThatThrownBy { messagingClient!!.start(rpcOps, userService) } assertThatThrownBy { startNodeMessagingClient() }
messagingClient = null messagingClient = null
} }
@Test @Test
fun `client should connect to local server`() { fun `client should connect to local server`() {
createMessagingServer().start() createMessagingServer().start()
createMessagingClient().start(rpcOps, userService) createMessagingClient()
startNodeMessagingClient()
} }
@Test @Test
fun `client should be able to send message to itself`() { fun `client should be able to send message to itself`() {
val receivedMessages = LinkedBlockingQueue<Message>() val receivedMessages = LinkedBlockingQueue<Message>()
createMessagingServer().start() val messagingClient = createAndStartClientAndServer(receivedMessages)
val messagingClient = createMessagingClient()
messagingClient.start(rpcOps, userService)
thread { messagingClient.run() }
messagingClient.addMessageHandler(topic) { message, r ->
receivedMessages.add(message)
}
val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray()) val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray())
messagingClient.send(message, messagingClient.myAddress) messagingClient.send(message, messagingClient.myAddress)
@ -142,9 +141,88 @@ class ArtemisMessagingTests {
assertNull(receivedMessages.poll(200, MILLISECONDS)) assertNull(receivedMessages.poll(200, MILLISECONDS))
} }
@Test
fun `client should be able to send message to itself before network map is available, and receive after`() {
val settableFuture: SettableFuture<Unit> = SettableFuture.create()
networkMapRegistrationFuture = settableFuture
val receivedMessages = LinkedBlockingQueue<Message>()
val messagingClient = createAndStartClientAndServer(receivedMessages)
val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray())
messagingClient.send(message, messagingClient.myAddress)
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray())
messagingClient.send(networkMapMessage, messagingClient.myAddress)
val actual: Message = receivedMessages.take()
assertEquals("second msg", String(actual.data))
assertNull(receivedMessages.poll(200, MILLISECONDS))
settableFuture.set(Unit)
val firstActual: Message = receivedMessages.take()
assertEquals("first msg", String(firstActual.data))
assertNull(receivedMessages.poll(200, MILLISECONDS))
}
@Test
fun `client should be able to send large numbers of messages to itself before network map is available and survive restart, then receive messages`() {
// Crank the iteration up as high as you want... just takes longer to run.
val iterations = 100
val settableFuture: SettableFuture<Unit> = SettableFuture.create()
networkMapRegistrationFuture = settableFuture
val receivedMessages = LinkedBlockingQueue<Message>()
val messagingClient = createAndStartClientAndServer(receivedMessages)
for (iter in 1..iterations) {
val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg $iter".toByteArray())
messagingClient.send(message, messagingClient.myAddress)
}
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray())
messagingClient.send(networkMapMessage, messagingClient.myAddress)
val actual: Message = receivedMessages.take()
assertEquals("second msg", String(actual.data))
assertNull(receivedMessages.poll(200, MILLISECONDS))
// Stop client and server and create afresh.
messagingClient.stop()
messagingServer?.stop()
networkMapRegistrationFuture = Futures.immediateFuture(Unit)
createAndStartClientAndServer(receivedMessages)
for (iter in 1..iterations) {
val firstActual: Message = receivedMessages.take()
assertTrue(String(firstActual.data).equals("first msg $iter"))
}
assertNull(receivedMessages.poll(200, MILLISECONDS))
}
private fun startNodeMessagingClient() {
messagingClient!!.start(rpcOps, userService)
}
private fun createAndStartClientAndServer(receivedMessages: LinkedBlockingQueue<Message>): NodeMessagingClient {
createMessagingServer().start()
val messagingClient = createMessagingClient()
startNodeMessagingClient()
messagingClient.addMessageHandler(topic) { message, r ->
receivedMessages.add(message)
}
messagingClient.addMessageHandler(NetworkMapService.FETCH_PROTOCOL_TOPIC) { message, r ->
receivedMessages.add(message)
}
// Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered.
thread { messagingClient.run() }
return messagingClient
}
private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient { private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient {
return databaseTransaction(database) { return databaseTransaction(database) {
NodeMessagingClient(config, server, identity.public.tree, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), database).apply { NodeMessagingClient(config, server, identity.public.tree, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), database, networkMapRegistrationFuture).apply {
configureWithDevSSLCertificate() configureWithDevSSLCertificate()
messagingClient = this messagingClient = this
} }