Unit tests: fix a couple of threading issues for async unit tests that use the in memory network

This commit is contained in:
Mike Hearn 2016-02-16 17:07:36 +01:00
parent de4427c240
commit ed72e9b997
2 changed files with 46 additions and 48 deletions

View File

@ -90,7 +90,7 @@ inline fun <T> logElapsedTime(label: String, logger: Logger? = null, body: () ->
*
* val ii = state.locked { i }
*/
class ThreadBox<T>(content: T, private val lock: Lock = ReentrantLock()) {
private val content = content
fun <R> locked(body: T.() -> R): R = lock.withLock { body(content) }
class ThreadBox<T>(content: T, val lock: Lock = ReentrantLock()) {
val content = content
inline fun <R> locked(body: T.() -> R): R = lock.withLock { body(content) }
}

View File

@ -12,6 +12,7 @@ import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import core.DummyTimestampingAuthority
import core.ThreadBox
import core.crypto.sha256
import core.node.TimestamperNodeService
import core.utilities.loggerFor
@ -19,7 +20,6 @@ import java.time.Instant
import java.util.*
import java.util.concurrent.Executor
import java.util.concurrent.LinkedBlockingQueue
import javax.annotation.concurrent.GuardedBy
import javax.annotation.concurrent.ThreadSafe
import kotlin.concurrent.thread
@ -31,7 +31,7 @@ import kotlin.concurrent.thread
* testing).
*/
@ThreadSafe
public class InMemoryNetwork {
class InMemoryNetwork {
private var counter = 0 // -1 means stopped.
private val handleNodeMap = HashMap<Handle, InMemoryNode>()
// All messages are kept here until the messages are pumped off the queue by a caller to the node class.
@ -53,7 +53,7 @@ public class InMemoryNetwork {
*/
@Synchronized
fun createNode(manuallyPumped: Boolean): Pair<Handle, MessagingServiceBuilder<InMemoryNode>> {
check(counter >= 0) { "In memory network stopped: please recreate. "}
check(counter >= 0) { "In memory network stopped: please recreate."}
val builder = createNodeWithID(manuallyPumped, counter) as Builder
counter++
val id = builder.id
@ -90,14 +90,15 @@ public class InMemoryNetwork {
val everyoneOnline: AllPossibleRecipients = object : AllPossibleRecipients {}
@Synchronized
fun stop() {
// toArrayList here just copies the collection, which we need because node.stop() will delete itself from
// the network map by calling netNodeHasShutdown. So we would get a CoModException if we didn't copy first.
for (node in handleNodeMap.values.toMutableList())
val nodes = synchronized(this) {
counter = -1
handleNodeMap.values.toList()
}
for (node in nodes)
node.stop()
counter = -1
handleNodeMap.clear()
messageQueues.clear()
}
@ -137,58 +138,60 @@ public class InMemoryNetwork {
*
* An instance can be obtained by creating a builder and then using the start method.
*/
@ThreadSafe
inner class InMemoryNode(private val manuallyPumped: Boolean, private val handle: Handle): MessagingService {
inner class Handler(val executor: Executor?, val topic: String,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
@GuardedBy("this")
protected val handlers: MutableList<Handler> = ArrayList()
@GuardedBy("this")
@Volatile
protected var running = true
@GuardedBy("this")
protected val pendingRedelivery = LinkedList<Message>()
protected inner class InnerState {
val handlers: MutableList<Handler> = ArrayList()
val pendingRedelivery = LinkedList<Message>()
}
protected val state = ThreadBox(InnerState())
override val myAddress: SingleMessageRecipient = handle
protected val backgroundThread = if (manuallyPumped) null else
thread(isDaemon = true, name = "In-memory message dispatcher ") {
thread(isDaemon = true, name = "In-memory message dispatcher") {
while (!Thread.currentThread().isInterrupted) {
try {
pumpInternal(true)
} catch(e: InterruptedException) {
if (synchronized(this) { running })
throw e
break
}
}
}
@Synchronized
override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
checkRunning()
check(running)
val (handler, items) = state.locked {
val handler = Handler(executor, topic, callback).apply { handlers.add(this) }
if (pendingRedelivery.isNotEmpty()) {
val items = ArrayList(pendingRedelivery)
pendingRedelivery.clear()
items.forEach { netSend(it, handle) }
Pair(handler, items)
}
for (it in items)
netSend(it, handle)
return handler
}
@Synchronized
override fun removeMessageHandler(registration: MessageHandlerRegistration) {
checkRunning()
check(handlers.remove(registration as Handler))
check(running)
state.locked { check(handlers.remove(registration as Handler)) }
}
@Synchronized
override fun send(message: Message, target: MessageRecipients) {
checkRunning()
check(running)
netSend(message, target)
}
@Synchronized
override fun stop() {
running = false
backgroundThread?.interrupt()
if (backgroundThread != null) {
backgroundThread.interrupt()
backgroundThread.join()
}
netNodeHasShutdown(handle)
}
@ -212,13 +215,8 @@ public class InMemoryNetwork {
*/
fun pump(block: Boolean): Boolean {
check(manuallyPumped)
checkRunning()
return pumpInternal(block)
}
@Synchronized
private fun checkRunning() {
check(running)
return pumpInternal(block)
}
private fun pumpInternal(block: Boolean): Boolean {
@ -228,22 +226,22 @@ public class InMemoryNetwork {
if (message == null)
return false
val deliverTo = synchronized(this) {
handlers.filter { if (it.topic.isBlank()) true else message.topic == it.topic }
}
val deliverTo = state.locked {
val h = handlers.filter { if (it.topic.isBlank()) true else message.topic == it.topic }
if (deliverTo.isEmpty()) {
if (h.isEmpty()) {
// Got no handlers for this message yet. Keep the message around and attempt redelivery after a new
// handler has been registered. The purpose of this path is to make unit tests that have multi-threading
// reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
// least sometimes.
synchronized(this) {
pendingRedelivery.add(message)
}
return false
}
h
}
for (handler in deliverTo) {
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
(handler.executor ?: MoreExecutors.directExecutor()).execute {