From 9904d5bb6eb31f11898f262b9e9dcfc17b9eb3b2 Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Mon, 25 Apr 2016 17:26:32 +0200 Subject: [PATCH] Address review comments from Shams. Also, make ServiceAffinityExecutor subclass ThreadPoolExecutor instead of delegating to ScheduledThreadPoolExecutor. This fixes an issue with exception reporting. --- .../core/messaging/StateMachineManager.kt | 6 +- src/main/kotlin/core/node/AbstractNode.kt | 4 +- src/main/kotlin/core/node/Node.kt | 4 ++ .../node/services/ArtemisMessagingService.kt | 2 +- .../kotlin/core/utilities/AffinityExecutor.kt | 57 ++++++----------- .../core/utilities/AffinityExecutorTests.kt | 61 ++++++++++++------- 6 files changed, 67 insertions(+), 67 deletions(-) diff --git a/src/main/kotlin/core/messaging/StateMachineManager.kt b/src/main/kotlin/core/messaging/StateMachineManager.kt index 8df2b2290d..788408cb1b 100644 --- a/src/main/kotlin/core/messaging/StateMachineManager.kt +++ b/src/main/kotlin/core/messaging/StateMachineManager.kt @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory import java.io.PrintWriter import java.io.StringWriter import java.util.* +import java.util.concurrent.atomic.AtomicBoolean import javax.annotation.concurrent.ThreadSafe /** @@ -239,13 +240,12 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec persistCheckpoint(prevCheckpointKey, curPersistedBytes) val newCheckpointKey = curPersistedBytes.sha256() logger.trace { "Waiting for message of type ${request.responseType.name} on $topic" } - var consumed = false + val consumed = AtomicBoolean() net.runOnNextMessage(topic, executor) { netMsg -> // Some assertions to ensure we don't execute on the wrong thread or get executed more than once. executor.checkOnThread() check(netMsg.topic == topic) { "Topic mismatch: ${netMsg.topic} vs $topic" } - check(!consumed) - consumed = true + check(!consumed.getAndSet(true)) // TODO: This is insecure: we should not deserialise whatever we find and *then* check. // // We should instead verify as we read the data that it's what we are expecting and throw as early as diff --git a/src/main/kotlin/core/node/AbstractNode.kt b/src/main/kotlin/core/node/AbstractNode.kt index 70522ef072..796bbff4e7 100644 --- a/src/main/kotlin/core/node/AbstractNode.kt +++ b/src/main/kotlin/core/node/AbstractNode.kt @@ -17,7 +17,6 @@ import java.nio.file.FileAlreadyExistsException import java.nio.file.Files import java.nio.file.Path import java.security.KeyPair -import java.security.PublicKey import java.time.Clock import java.util.* @@ -35,7 +34,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, // We will run as much stuff in this single thread as possible to keep the risk of thread safety bugs low during the // low-performance prototyping period. - protected open val serverThread: AffinityExecutor = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1) + protected abstract val serverThread: AffinityExecutor // Objects in this list will be scanned by the DataUploadServlet and can be handed new data via HTTP. // Don't mutate this after startup. @@ -130,7 +129,6 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, open fun stop() { net.stop() - serverThread.shutdownNow() } protected abstract fun makeMessagingService(): MessagingService diff --git a/src/main/kotlin/core/node/Node.kt b/src/main/kotlin/core/node/Node.kt index 71e788896f..0d53926907 100644 --- a/src/main/kotlin/core/node/Node.kt +++ b/src/main/kotlin/core/node/Node.kt @@ -9,6 +9,7 @@ import core.messaging.MessagingService import core.node.services.ArtemisMessagingService import core.node.servlets.AttachmentDownloadServlet import core.node.servlets.DataUploadServlet +import core.utilities.AffinityExecutor import core.utilities.loggerFor import org.eclipse.jetty.server.Server import org.eclipse.jetty.server.handler.HandlerCollection @@ -52,6 +53,8 @@ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration override val log = loggerFor() + override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1) + lateinit var webServer: Server // Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us @@ -135,6 +138,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration webServer.stop() super.stop() nodeFileLock!!.release() + serverThread.shutdownNow() } private fun alreadyRunningNodeCheck() { diff --git a/src/main/kotlin/core/node/services/ArtemisMessagingService.kt b/src/main/kotlin/core/node/services/ArtemisMessagingService.kt index 13c16c6fef..914c024845 100644 --- a/src/main/kotlin/core/node/services/ArtemisMessagingService.kt +++ b/src/main/kotlin/core/node/services/ArtemisMessagingService.kt @@ -168,7 +168,7 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort, private fun deliverMessage(msg: Message): Boolean { // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added // or removed whilst the filter is executing will not affect anything. - val deliverTo = handlers.filter { if (it.topic.isBlank()) true else it.topic == msg.topic } + val deliverTo = handlers.filter { it.topic.isBlank() || it.topic == msg.topic } if (deliverTo.isEmpty()) { // This should probably be downgraded to a trace in future, so the protocol can evolve with new topics diff --git a/src/main/kotlin/core/utilities/AffinityExecutor.kt b/src/main/kotlin/core/utilities/AffinityExecutor.kt index 67da923863..5b16276002 100644 --- a/src/main/kotlin/core/utilities/AffinityExecutor.kt +++ b/src/main/kotlin/core/utilities/AffinityExecutor.kt @@ -1,7 +1,6 @@ package core.utilities import com.google.common.util.concurrent.Uninterruptibles -import java.time.Duration import java.util.* import java.util.concurrent.* import java.util.function.Supplier @@ -14,7 +13,7 @@ interface AffinityExecutor : Executor { /** Returns true if the current thread is equal to the thread this executor is backed by. */ val isOnThread: Boolean - /** Throws an IllegalStateException if the current thread is equal to the thread this executor is backed by. */ + /** Throws an IllegalStateException if the current thread is not one of the threads this executor is backed by. */ fun checkOnThread() { if (!isOnThread) throw IllegalStateException("On wrong thread: " + Thread.currentThread()) @@ -28,9 +27,6 @@ interface AffinityExecutor : Executor { execute(runnable) } - /** Terminates any backing thread (pool) without waiting for tasks to finish. */ - fun shutdownNow() - /** * Runs the given function on the executor, blocking until the result is available. Be careful not to deadlock this * way! Make sure the executor can't possibly be waiting for the calling thread. @@ -42,26 +38,29 @@ interface AffinityExecutor : Executor { return CompletableFuture.supplyAsync(Supplier { fetcher() }, this).get() } + /** + * Posts a no-op task to the executor and blocks this thread waiting for it to complete. This can be useful in + * tests when you want to be sure that a previous task submitted via [execute] has completed. + */ + fun flush() { + fetchFrom { } + } + /** * An executor backed by thread pool (which may often have a single thread) which makes it easy to schedule * tasks in the future and verify code is running on the executor. */ - class ServiceAffinityExecutor(threadName: String, numThreads: Int) : AffinityExecutor { + class ServiceAffinityExecutor(threadName: String, numThreads: Int) : AffinityExecutor, + ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, LinkedBlockingQueue()) { protected val threads = Collections.synchronizedSet(HashSet()) - - private val handler = Thread.currentThread().uncaughtExceptionHandler - val service: ScheduledThreadPoolExecutor + private val uncaughtExceptionHandler = Thread.currentThread().uncaughtExceptionHandler init { - val threadFactory = fun(runnable: Runnable): Thread { + setThreadFactory(fun(runnable: Runnable): Thread { val thread = object : Thread() { override fun run() { try { runnable.run() - } catch (e: Throwable) { - e.printStackTrace() - handler.uncaughtException(this, e) - throw e } finally { threads -= this } @@ -71,29 +70,16 @@ interface AffinityExecutor : Executor { thread.name = threadName threads += thread return thread - } - // The scheduled variant of the JDK thread pool doesn't do automatic calibration of the thread pool size, - // it always uses the 'core size'. So there is no point in allowing separate specification of core and max - // numbers of threads. - service = ScheduledThreadPoolExecutor(numThreads, threadFactory) + }) + } + + override fun afterExecute(r: Runnable, t: Throwable?) { + if (t != null) + uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t) } override val isOnThread: Boolean get() = Thread.currentThread() in threads - override fun execute(command: Runnable) { - service.execute { - command.run() - } - } - - fun executeIn(time: Duration, command: () -> T): ScheduledFuture { - return service.schedule(Callable { command() }, time.toMillis(), TimeUnit.MILLISECONDS) - } - - override fun shutdownNow() { - service.shutdownNow() - } - companion object { val logger = loggerFor() } @@ -122,17 +108,12 @@ interface AffinityExecutor : Executor { } val taskQueueSize: Int get() = commandQ.size - - override fun shutdownNow() { - } } companion object { val SAME_THREAD: AffinityExecutor = object : AffinityExecutor { override val isOnThread: Boolean get() = true override fun execute(command: Runnable) = command.run() - override fun shutdownNow() { - } } } } \ No newline at end of file diff --git a/src/test/kotlin/core/utilities/AffinityExecutorTests.kt b/src/test/kotlin/core/utilities/AffinityExecutorTests.kt index a15c486172..9b8a760c3b 100644 --- a/src/test/kotlin/core/utilities/AffinityExecutorTests.kt +++ b/src/test/kotlin/core/utilities/AffinityExecutorTests.kt @@ -1,43 +1,60 @@ package core.utilities +import org.junit.After import org.junit.Test import java.util.* +import java.util.concurrent.CompletableFuture import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicReference import kotlin.concurrent.thread import kotlin.test.assertEquals import kotlin.test.assertFails import kotlin.test.assertNotEquals class AffinityExecutorTests { - @Test fun `direct thread executor works`() { - val thisThread = Thread.currentThread() - AffinityExecutor.SAME_THREAD.execute { assertEquals(thisThread, Thread.currentThread()) } - AffinityExecutor.SAME_THREAD.executeASAP { assertEquals(thisThread, Thread.currentThread()) } + @Test fun `AffinityExecutor SAME_THREAD executes on calling thread`() { assert(AffinityExecutor.SAME_THREAD.isOnThread) + + run { + val thatThread = CompletableFuture() + AffinityExecutor.SAME_THREAD.execute { thatThread.complete(Thread.currentThread()) } + assertEquals(Thread.currentThread(), thatThread.get()) + } + run { + val thatThread = CompletableFuture() + AffinityExecutor.SAME_THREAD.executeASAP { thatThread.complete(Thread.currentThread()) } + assertEquals(Thread.currentThread(), thatThread.get()) + } } - @Test fun `single threaded affinity executor works`() { + var executor: AffinityExecutor.ServiceAffinityExecutor? = null + + @After fun shutdown() { + executor?.shutdown() + } + + @Test fun `single threaded affinity executor runs on correct thread`() { val thisThread = Thread.currentThread() val executor = AffinityExecutor.ServiceAffinityExecutor("test thread", 1) assert(!executor.isOnThread) assertFails { executor.checkOnThread() } - var thread: Thread? = null + val thread = AtomicReference() executor.execute { assertNotEquals(thisThread, Thread.currentThread()) executor.checkOnThread() - thread = Thread.currentThread() + thread.set(Thread.currentThread()) } + val thread2 = AtomicReference() executor.execute { - assertEquals(thread, Thread.currentThread()) + thread2.set(Thread.currentThread()) executor.checkOnThread() } - executor.fetchFrom { } // Serialize - - executor.service.shutdown() + executor.flush() + assertEquals(thread2.get(), thread.get()) } - @Test fun `pooled executor works`() { + @Test fun `pooled executor`() { val executor = AffinityExecutor.ServiceAffinityExecutor("test2", 3) assert(!executor.isOnThread) @@ -53,29 +70,29 @@ class AffinityExecutorTests { } blockAThread() blockAThread() - executor.fetchFrom { } // Serialize + executor.flush() assertEquals(2, threads.size) - executor.fetchFrom { + val numThreads = executor.fetchFrom { assert(executor.isOnThread) threads += Thread.currentThread() - assertEquals(3, threads.distinct().size) + threads.distinct().size } + assertEquals(3, numThreads) latch.countDown() - executor.fetchFrom { } // Serialize - executor.service.shutdown() + executor.flush() } - @Volatile var exception: Throwable? = null - @Test fun exceptions() { + @Test fun `exceptions are reported to the specified handler`() { + val exception = AtomicReference() // Run in a separate thread to avoid messing with any default exception handlers in the unit test thread. thread { - Thread.setDefaultUncaughtExceptionHandler { thread, throwable -> exception = throwable } + Thread.currentThread().setUncaughtExceptionHandler { thread, throwable -> exception.set(throwable) } val executor = AffinityExecutor.ServiceAffinityExecutor("test3", 1) executor.execute { throw Exception("foo") } - executor.fetchFrom { } // Serialize - assertEquals("foo", exception!!.message) + executor.flush() }.join() + assertEquals("foo", exception.get()?.message) } } \ No newline at end of file