Address review comments from Shams.

Also, make ServiceAffinityExecutor subclass ThreadPoolExecutor instead of delegating to ScheduledThreadPoolExecutor. This fixes an issue with exception reporting.
This commit is contained in:
Mike Hearn 2016-04-25 17:26:32 +02:00
parent e5a0a211da
commit 9904d5bb6e
6 changed files with 67 additions and 67 deletions

@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory
import java.io.PrintWriter
import java.io.StringWriter
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.concurrent.ThreadSafe
/**
@ -239,13 +240,12 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
persistCheckpoint(prevCheckpointKey, curPersistedBytes)
val newCheckpointKey = curPersistedBytes.sha256()
logger.trace { "Waiting for message of type ${request.responseType.name} on $topic" }
var consumed = false
val consumed = AtomicBoolean()
net.runOnNextMessage(topic, executor) { netMsg ->
// Some assertions to ensure we don't execute on the wrong thread or get executed more than once.
executor.checkOnThread()
check(netMsg.topic == topic) { "Topic mismatch: ${netMsg.topic} vs $topic" }
check(!consumed)
consumed = true
check(!consumed.getAndSet(true))
// TODO: This is insecure: we should not deserialise whatever we find and *then* check.
//
// We should instead verify as we read the data that it's what we are expecting and throw as early as

@ -17,7 +17,6 @@ import java.nio.file.FileAlreadyExistsException
import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyPair
import java.security.PublicKey
import java.time.Clock
import java.util.*
@ -35,7 +34,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
// We will run as much stuff in this single thread as possible to keep the risk of thread safety bugs low during the
// low-performance prototyping period.
protected open val serverThread: AffinityExecutor = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1)
protected abstract val serverThread: AffinityExecutor
// Objects in this list will be scanned by the DataUploadServlet and can be handed new data via HTTP.
// Don't mutate this after startup.
@ -130,7 +129,6 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
open fun stop() {
net.stop()
serverThread.shutdownNow()
}
protected abstract fun makeMessagingService(): MessagingService

@ -9,6 +9,7 @@ import core.messaging.MessagingService
import core.node.services.ArtemisMessagingService
import core.node.servlets.AttachmentDownloadServlet
import core.node.servlets.DataUploadServlet
import core.utilities.AffinityExecutor
import core.utilities.loggerFor
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.handler.HandlerCollection
@ -52,6 +53,8 @@ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration
override val log = loggerFor<Node>()
override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1)
lateinit var webServer: Server
// Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us
@ -135,6 +138,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration
webServer.stop()
super.stop()
nodeFileLock!!.release()
serverThread.shutdownNow()
}
private fun alreadyRunningNodeCheck() {

@ -168,7 +168,7 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
private fun deliverMessage(msg: Message): Boolean {
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
// or removed whilst the filter is executing will not affect anything.
val deliverTo = handlers.filter { if (it.topic.isBlank()) true else it.topic == msg.topic }
val deliverTo = handlers.filter { it.topic.isBlank() || it.topic == msg.topic }
if (deliverTo.isEmpty()) {
// This should probably be downgraded to a trace in future, so the protocol can evolve with new topics

@ -1,7 +1,6 @@
package core.utilities
import com.google.common.util.concurrent.Uninterruptibles
import java.time.Duration
import java.util.*
import java.util.concurrent.*
import java.util.function.Supplier
@ -14,7 +13,7 @@ interface AffinityExecutor : Executor {
/** Returns true if the current thread is equal to the thread this executor is backed by. */
val isOnThread: Boolean
/** Throws an IllegalStateException if the current thread is equal to the thread this executor is backed by. */
/** Throws an IllegalStateException if the current thread is not one of the threads this executor is backed by. */
fun checkOnThread() {
if (!isOnThread)
throw IllegalStateException("On wrong thread: " + Thread.currentThread())
@ -28,9 +27,6 @@ interface AffinityExecutor : Executor {
execute(runnable)
}
/** Terminates any backing thread (pool) without waiting for tasks to finish. */
fun shutdownNow()
/**
* Runs the given function on the executor, blocking until the result is available. Be careful not to deadlock this
* way! Make sure the executor can't possibly be waiting for the calling thread.
@ -42,26 +38,29 @@ interface AffinityExecutor : Executor {
return CompletableFuture.supplyAsync(Supplier { fetcher() }, this).get()
}
/**
* Posts a no-op task to the executor and blocks this thread waiting for it to complete. This can be useful in
* tests when you want to be sure that a previous task submitted via [execute] has completed.
*/
fun flush() {
fetchFrom { }
}
/**
* An executor backed by thread pool (which may often have a single thread) which makes it easy to schedule
* tasks in the future and verify code is running on the executor.
*/
class ServiceAffinityExecutor(threadName: String, numThreads: Int) : AffinityExecutor {
class ServiceAffinityExecutor(threadName: String, numThreads: Int) : AffinityExecutor,
ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, LinkedBlockingQueue<Runnable>()) {
protected val threads = Collections.synchronizedSet(HashSet<Thread>())
private val handler = Thread.currentThread().uncaughtExceptionHandler
val service: ScheduledThreadPoolExecutor
private val uncaughtExceptionHandler = Thread.currentThread().uncaughtExceptionHandler
init {
val threadFactory = fun(runnable: Runnable): Thread {
setThreadFactory(fun(runnable: Runnable): Thread {
val thread = object : Thread() {
override fun run() {
try {
runnable.run()
} catch (e: Throwable) {
e.printStackTrace()
handler.uncaughtException(this, e)
throw e
} finally {
threads -= this
}
@ -71,29 +70,16 @@ interface AffinityExecutor : Executor {
thread.name = threadName
threads += thread
return thread
}
// The scheduled variant of the JDK thread pool doesn't do automatic calibration of the thread pool size,
// it always uses the 'core size'. So there is no point in allowing separate specification of core and max
// numbers of threads.
service = ScheduledThreadPoolExecutor(numThreads, threadFactory)
})
}
override fun afterExecute(r: Runnable, t: Throwable?) {
if (t != null)
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t)
}
override val isOnThread: Boolean get() = Thread.currentThread() in threads
override fun execute(command: Runnable) {
service.execute {
command.run()
}
}
fun <T> executeIn(time: Duration, command: () -> T): ScheduledFuture<T> {
return service.schedule(Callable { command() }, time.toMillis(), TimeUnit.MILLISECONDS)
}
override fun shutdownNow() {
service.shutdownNow()
}
companion object {
val logger = loggerFor<ServiceAffinityExecutor>()
}
@ -122,17 +108,12 @@ interface AffinityExecutor : Executor {
}
val taskQueueSize: Int get() = commandQ.size
override fun shutdownNow() {
}
}
companion object {
val SAME_THREAD: AffinityExecutor = object : AffinityExecutor {
override val isOnThread: Boolean get() = true
override fun execute(command: Runnable) = command.run()
override fun shutdownNow() {
}
}
}
}

@ -1,43 +1,60 @@
package core.utilities
import org.junit.After
import org.junit.Test
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertFails
import kotlin.test.assertNotEquals
class AffinityExecutorTests {
@Test fun `direct thread executor works`() {
val thisThread = Thread.currentThread()
AffinityExecutor.SAME_THREAD.execute { assertEquals(thisThread, Thread.currentThread()) }
AffinityExecutor.SAME_THREAD.executeASAP { assertEquals(thisThread, Thread.currentThread()) }
@Test fun `AffinityExecutor SAME_THREAD executes on calling thread`() {
assert(AffinityExecutor.SAME_THREAD.isOnThread)
run {
val thatThread = CompletableFuture<Thread>()
AffinityExecutor.SAME_THREAD.execute { thatThread.complete(Thread.currentThread()) }
assertEquals(Thread.currentThread(), thatThread.get())
}
run {
val thatThread = CompletableFuture<Thread>()
AffinityExecutor.SAME_THREAD.executeASAP { thatThread.complete(Thread.currentThread()) }
assertEquals(Thread.currentThread(), thatThread.get())
}
}
@Test fun `single threaded affinity executor works`() {
var executor: AffinityExecutor.ServiceAffinityExecutor? = null
@After fun shutdown() {
executor?.shutdown()
}
@Test fun `single threaded affinity executor runs on correct thread`() {
val thisThread = Thread.currentThread()
val executor = AffinityExecutor.ServiceAffinityExecutor("test thread", 1)
assert(!executor.isOnThread)
assertFails { executor.checkOnThread() }
var thread: Thread? = null
val thread = AtomicReference<Thread>()
executor.execute {
assertNotEquals(thisThread, Thread.currentThread())
executor.checkOnThread()
thread = Thread.currentThread()
thread.set(Thread.currentThread())
}
val thread2 = AtomicReference<Thread>()
executor.execute {
assertEquals(thread, Thread.currentThread())
thread2.set(Thread.currentThread())
executor.checkOnThread()
}
executor.fetchFrom { } // Serialize
executor.service.shutdown()
executor.flush()
assertEquals(thread2.get(), thread.get())
}
@Test fun `pooled executor works`() {
@Test fun `pooled executor`() {
val executor = AffinityExecutor.ServiceAffinityExecutor("test2", 3)
assert(!executor.isOnThread)
@ -53,29 +70,29 @@ class AffinityExecutorTests {
}
blockAThread()
blockAThread()
executor.fetchFrom { } // Serialize
executor.flush()
assertEquals(2, threads.size)
executor.fetchFrom {
val numThreads = executor.fetchFrom {
assert(executor.isOnThread)
threads += Thread.currentThread()
assertEquals(3, threads.distinct().size)
threads.distinct().size
}
assertEquals(3, numThreads)
latch.countDown()
executor.fetchFrom { } // Serialize
executor.service.shutdown()
executor.flush()
}
@Volatile var exception: Throwable? = null
@Test fun exceptions() {
@Test fun `exceptions are reported to the specified handler`() {
val exception = AtomicReference<Throwable?>()
// Run in a separate thread to avoid messing with any default exception handlers in the unit test thread.
thread {
Thread.setDefaultUncaughtExceptionHandler { thread, throwable -> exception = throwable }
Thread.currentThread().setUncaughtExceptionHandler { thread, throwable -> exception.set(throwable) }
val executor = AffinityExecutor.ServiceAffinityExecutor("test3", 1)
executor.execute {
throw Exception("foo")
}
executor.fetchFrom { } // Serialize
assertEquals("foo", exception!!.message)
executor.flush()
}.join()
assertEquals("foo", exception.get()?.message)
}
}