Do not start the P2P consumer until we have at least one registered handler (the state machine). This prevents message being delivered too early. (#4988)

Consider shutdown logic if network map doesn't finish
This commit is contained in:
Matthew Nesbit 2019-04-05 15:55:45 +01:00 committed by GitHub
parent 685f94bf66
commit 76d738c452
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -133,6 +133,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val delayStartQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
private val handlers = ConcurrentHashMap<String, MessageHandler>()
private val handlersChangedSignal = java.lang.Object()
private val deduplicator = P2PMessageDeduplicator(cacheFactory, database)
internal var messagingExecutor: MessagingExecutor? = null
@ -306,6 +307,11 @@ class P2PMessagingClient(val config: NodeConfiguration,
fun run() {
val latch = CountDownLatch(1)
try {
synchronized(handlersChangedSignal) {
while (handlers.isEmpty() && state.locked { (p2pConsumer != null) }) {
handlersChangedSignal.wait()
}
}
val consumer = state.locked {
check(started) { "start must be called first" }
check(!running) { "run can't be called twice" }
@ -441,6 +447,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
eventsSubscription = null
prevRunning
}
synchronized(handlersChangedSignal) {
handlersChangedSignal.notifyAll()
}
if (running && !nodeExecutor.isOnThread) {
// Wait for the main loop to notice the consumer has gone and finish up.
shutdownLatch.await()
@ -528,6 +537,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
}
callback
}
synchronized(handlersChangedSignal) {
handlersChangedSignal.notifyAll()
}
return HandlerRegistration(topic, callback)
}