mirror of
https://github.com/corda/corda.git
synced 2025-02-21 09:51:57 +00:00
Show origin test in ThreadLeakException. (#2143)
and downgrade an error to warn
This commit is contained in:
parent
ff9e7474b1
commit
74bf00c155
@ -43,7 +43,7 @@ class ThreadLocalToggleField<T>(name: String) : ToggleField<T>(name) {
|
||||
}
|
||||
|
||||
/** The named thread has leaked from a previous test. */
|
||||
class ThreadLeakException : RuntimeException("Leaked thread detected: ${Thread.currentThread().name}")
|
||||
class ThreadLeakException(valueToString: String) : RuntimeException("Leaked thread '${Thread.currentThread().name}' detected, value was: $valueToString")
|
||||
|
||||
/** @param isAGlobalThreadBeingCreated whether a global thread (that should not inherit any value) is being created. */
|
||||
class InheritableThreadLocalToggleField<T>(name: String,
|
||||
@ -54,16 +54,12 @@ class InheritableThreadLocalToggleField<T>(name: String,
|
||||
}
|
||||
|
||||
private inner class Holder(value: T) : AtomicReference<T?>(value) {
|
||||
fun valueOrDeclareLeak() = get() ?: throw ThreadLeakException()
|
||||
private val valueToString = value.toString() // We never set another non-null value.
|
||||
fun valueOrDeclareLeak() = get() ?: throw ThreadLeakException(valueToString)
|
||||
fun childValue(): Holder? {
|
||||
val e = ThreadLeakException() // Expensive, but so is starting the new thread.
|
||||
return if (isAGlobalThreadBeingCreated(e.stackTrace)) {
|
||||
get() ?: log.warn(e.message)
|
||||
null
|
||||
} else {
|
||||
get() ?: log.error(e.message)
|
||||
this
|
||||
}
|
||||
val e = ThreadLeakException(valueToString) // Expensive, but so is starting the new thread.
|
||||
get() ?: log.warn(e.message)
|
||||
return if (isAGlobalThreadBeingCreated(e.stackTrace)) null else this
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,7 @@ interface SerializationEnvironment {
|
||||
val checkpointContext: SerializationContext
|
||||
}
|
||||
|
||||
class SerializationEnvironmentImpl(
|
||||
open class SerializationEnvironmentImpl(
|
||||
override val serializationFactory: SerializationFactory,
|
||||
override val p2pContext: SerializationContext,
|
||||
rpcServerContext: SerializationContext? = null,
|
||||
|
@ -134,6 +134,7 @@ class ToggleFieldTest {
|
||||
assertThatThrownBy { future.getOrThrow() }
|
||||
.isInstanceOf(ThreadLeakException::class.java)
|
||||
.hasMessageContaining(threadName)
|
||||
.hasMessageContaining("hello")
|
||||
}
|
||||
}
|
||||
withSingleThreadExecutor {
|
||||
@ -141,9 +142,9 @@ class ToggleFieldTest {
|
||||
}
|
||||
}
|
||||
|
||||
/** We log an error rather than failing-fast as the new thread may be an undetected global. */
|
||||
/** We log a warning rather than failing-fast as the new thread may be an undetected global. */
|
||||
@Test
|
||||
fun `leaked thread propagates holder to non-global thread, with error`() {
|
||||
fun `leaked thread propagates holder to non-global thread, with warning`() {
|
||||
val field = inheritableThreadLocalToggleField<String>()
|
||||
field.set("hello")
|
||||
withSingleThreadExecutor {
|
||||
@ -153,17 +154,18 @@ class ToggleFieldTest {
|
||||
val leakedThreadName = Thread.currentThread().name
|
||||
verifyNoMoreInteractions(log)
|
||||
withSingleThreadExecutor {
|
||||
// If ThreadLeakException is seen in practice, these errors form a trail of where the holder has been:
|
||||
verify(log).error(argThat { contains(leakedThreadName) })
|
||||
// If ThreadLeakException is seen in practice, these warnings form a trail of where the holder has been:
|
||||
verify(log).warn(argThat { contains(leakedThreadName) && contains("hello") })
|
||||
val newThreadName = fork { Thread.currentThread().name }.getOrThrow()
|
||||
val future = fork(field::get)
|
||||
assertThatThrownBy { future.getOrThrow() }
|
||||
.isInstanceOf(ThreadLeakException::class.java)
|
||||
.hasMessageContaining(newThreadName)
|
||||
.hasMessageContaining("hello")
|
||||
fork {
|
||||
verifyNoMoreInteractions(log)
|
||||
withSingleThreadExecutor {
|
||||
verify(log).error(argThat { contains(newThreadName) })
|
||||
verify(log).warn(argThat { contains(newThreadName) && contains("hello") })
|
||||
}
|
||||
}.getOrThrow()
|
||||
}
|
||||
@ -183,7 +185,7 @@ class ToggleFieldTest {
|
||||
globalThreadCreationMethod {
|
||||
verifyNoMoreInteractions(log)
|
||||
withSingleThreadExecutor {
|
||||
verify(log).warn(argThat { contains(leakedThreadName) })
|
||||
verify(log).warn(argThat { contains(leakedThreadName) && contains("hello") })
|
||||
// In practice the new thread is for example a static thread we can't get rid of:
|
||||
assertNull(fork(field::get).getOrThrow())
|
||||
}
|
||||
|
@ -15,10 +15,13 @@ import org.junit.runners.model.Statement
|
||||
|
||||
/** @param inheritable whether new threads inherit the environment, use sparingly. */
|
||||
class SerializationEnvironmentRule(private val inheritable: Boolean = false) : TestRule {
|
||||
val env: SerializationEnvironment = createTestSerializationEnv()
|
||||
override fun apply(base: Statement, description: Description?) = object : Statement() {
|
||||
override fun evaluate() = env.asContextEnv(inheritable) {
|
||||
base.evaluate()
|
||||
lateinit var env: SerializationEnvironment
|
||||
override fun apply(base: Statement, description: Description): Statement {
|
||||
env = createTestSerializationEnv(description.toString())
|
||||
return object : Statement() {
|
||||
override fun evaluate() = env.asContextEnv(inheritable) {
|
||||
base.evaluate()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -30,7 +33,7 @@ interface GlobalSerializationEnvironment : SerializationEnvironment {
|
||||
|
||||
/** @param inheritable whether new threads inherit the environment, use sparingly. */
|
||||
fun <T> withTestSerialization(inheritable: Boolean = false, callable: (SerializationEnvironment) -> T): T {
|
||||
return createTestSerializationEnv().asContextEnv(inheritable, callable)
|
||||
return createTestSerializationEnv("<context>").asContextEnv(inheritable, callable)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -53,7 +56,7 @@ fun <T> withoutTestSerialization(callable: () -> T): T {
|
||||
*/
|
||||
fun setGlobalSerialization(armed: Boolean): GlobalSerializationEnvironment {
|
||||
return if (armed) {
|
||||
object : GlobalSerializationEnvironment, SerializationEnvironment by createTestSerializationEnv() {
|
||||
object : GlobalSerializationEnvironment, SerializationEnvironment by createTestSerializationEnv("<global>") {
|
||||
override fun unset() {
|
||||
_globalSerializationEnv.set(null)
|
||||
}
|
||||
@ -67,7 +70,7 @@ fun setGlobalSerialization(armed: Boolean): GlobalSerializationEnvironment {
|
||||
}
|
||||
}
|
||||
|
||||
private fun createTestSerializationEnv() = SerializationEnvironmentImpl(
|
||||
private fun createTestSerializationEnv(label: String) = object : SerializationEnvironmentImpl(
|
||||
SerializationFactoryImpl().apply {
|
||||
registerScheme(KryoClientSerializationScheme())
|
||||
registerScheme(KryoServerSerializationScheme())
|
||||
@ -78,7 +81,9 @@ private fun createTestSerializationEnv() = SerializationEnvironmentImpl(
|
||||
KRYO_RPC_SERVER_CONTEXT,
|
||||
KRYO_RPC_CLIENT_CONTEXT,
|
||||
if (isAmqpEnabled()) AMQP_STORAGE_CONTEXT else KRYO_STORAGE_CONTEXT,
|
||||
KRYO_CHECKPOINT_CONTEXT)
|
||||
KRYO_CHECKPOINT_CONTEXT) {
|
||||
override fun toString() = "testSerializationEnv($label)"
|
||||
}
|
||||
|
||||
private const val AMQP_ENABLE_PROP_NAME = "net.corda.testing.amqp.enable"
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user