CORDA-716 Don't allow the netty global executor to inherit serialization env holder (#2048)

and close some dangling RPC connections.
This commit is contained in:
Andrzej Cichocki 2017-11-15 11:22:35 +00:00 committed by GitHub
parent 2577c75f28
commit c4a9320e70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 69 additions and 29 deletions

View File

@ -1,6 +1,7 @@
package net.corda.core.internal
import net.corda.core.utilities.loggerFor
import org.slf4j.Logger
import java.util.concurrent.atomic.AtomicReference
import kotlin.reflect.KProperty
@ -44,33 +45,25 @@ class ThreadLocalToggleField<T>(name: String) : ToggleField<T>(name) {
/** The named thread has leaked from a previous test. */
class ThreadLeakException : RuntimeException("Leaked thread detected: ${Thread.currentThread().name}")
class InheritableThreadLocalToggleField<T>(name: String) : ToggleField<T>(name) {
companion object {
private val log = loggerFor<InheritableThreadLocalToggleField<*>>()
private fun ThreadLeakException.isProblematic(): Boolean {
stackTrace.forEach {
// A dying Netty thread's death event restarting the Netty global executor:
it.className == "io.netty.util.concurrent.GlobalEventExecutor" && it.methodName == "startThread" && return false
}
return true
}
}
private class Holder<T>(value: T) : AtomicReference<T?>(value) {
/** @param exceptionHandler should throw the exception, or may return normally to suppress inheritance. */
class InheritableThreadLocalToggleField<T>(name: String,
private val log: Logger = loggerFor<InheritableThreadLocalToggleField<*>>(),
private val exceptionHandler: (ThreadLeakException) -> Unit = { throw it }) : ToggleField<T>(name) {
private inner class Holder(value: T) : AtomicReference<T?>(value) {
fun valueOrDeclareLeak() = get() ?: throw ThreadLeakException()
fun maybeFailFastIfCurrentThreadIsLeaked() {
get() != null && return // Current thread isn't leaked.
fun childValue(): Holder? {
get() != null && return this // Current thread isn't leaked.
val e = ThreadLeakException()
e.isProblematic() && throw e
log.warn(e.message) // The exception on value retrieval is still enabled.
exceptionHandler(e)
log.warn(e.message)
return null
}
}
private val threadLocal = object : InheritableThreadLocal<Holder<T>?>() {
override fun childValue(holder: Holder<T>?): Holder<T>? {
// The Holder itself may be null due to prior events, a leak is not implied in that case:
holder?.maybeFailFastIfCurrentThreadIsLeaked()
return holder // What super does.
private val threadLocal = object : InheritableThreadLocal<Holder?>() {
override fun childValue(holder: InheritableThreadLocalToggleField<T>.Holder?): InheritableThreadLocalToggleField<T>.Holder? {
// The Holder itself may be null due to prior events, a leak is not indicated in that case:
return holder?.childValue()
}
}

View File

@ -43,7 +43,13 @@ val _globalSerializationEnv = SimpleToggleField<SerializationEnvironment>("globa
@VisibleForTesting
val _contextSerializationEnv = ThreadLocalToggleField<SerializationEnvironment>("contextSerializationEnv")
@VisibleForTesting
val _inheritableContextSerializationEnv = InheritableThreadLocalToggleField<SerializationEnvironment>("inheritableContextSerializationEnv")
val _inheritableContextSerializationEnv = InheritableThreadLocalToggleField<SerializationEnvironment>("inheritableContextSerializationEnv") suppressInherit@ {
it.stackTrace.forEach {
// A dying Netty thread's death event restarting the Netty global executor:
it.className == "io.netty.util.concurrent.GlobalEventExecutor" && it.methodName == "startThread" && return@suppressInherit
}
throw it
}
private val serializationEnvProperties = listOf(_nodeSerializationEnv, _globalSerializationEnv, _contextSerializationEnv, _inheritableContextSerializationEnv)
val effectiveSerializationEnv: SerializationEnvironment
get() = serializationEnvProperties.map { Pair(it, it.get()) }.filter { it.second != null }.run {

View File

@ -1,9 +1,14 @@
package net.corda.core.internal
import com.nhaarman.mockito_kotlin.argThat
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.verifyNoMoreInteractions
import net.corda.core.internal.concurrent.fork
import net.corda.core.utilities.getOrThrow
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import org.slf4j.Logger
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
@ -88,8 +93,16 @@ class ToggleFieldTest {
}
@Test
fun `inherited values are poisoned on clear`() {
val field = InheritableThreadLocalToggleField<String>("field")
fun `with default exception handler, inherited values are poisoned on clear`() {
`inherited values are poisoned on clear`(InheritableThreadLocalToggleField("field") { throw it })
}
@Test
fun `with lenient exception handler, inherited values are poisoned on clear`() {
`inherited values are poisoned on clear`(InheritableThreadLocalToggleField("field") {})
}
private fun `inherited values are poisoned on clear`(field: InheritableThreadLocalToggleField<String>) {
field.set("hello")
withSingleThreadExecutor {
assertEquals("hello", fork(field::get).getOrThrow())
@ -109,8 +122,8 @@ class ToggleFieldTest {
}
@Test
fun `leaked thread is detected as soon as it tries to create another`() {
val field = InheritableThreadLocalToggleField<String>("field")
fun `with default exception handler, leaked thread is detected as soon as it tries to create another`() {
val field = InheritableThreadLocalToggleField<String>("field") { throw it }
field.set("hello")
withSingleThreadExecutor {
assertEquals("hello", fork(field::get).getOrThrow())
@ -122,4 +135,25 @@ class ToggleFieldTest {
.hasMessageContaining(threadName)
}
}
@Test
fun `with lenient exception handler, leaked thread logs a warning and does not propagate the holder`() {
val log = mock<Logger>()
val field = InheritableThreadLocalToggleField<String>("field", log) {}
field.set("hello")
withSingleThreadExecutor {
assertEquals("hello", fork(field::get).getOrThrow())
field.set(null) // The executor thread is now considered leaked.
val threadName = fork { Thread.currentThread().name }.getOrThrow()
fork {
verifyNoMoreInteractions(log)
withSingleThreadExecutor {
verify(log).warn(argThat { contains(threadName) })
// In practice the new thread is for example a static thread we can't get rid of:
assertNull(fork(field::get).getOrThrow())
}
}.getOrThrow()
}
verifyNoMoreInteractions(log)
}
}

View File

@ -2,6 +2,7 @@ package net.corda.services.messaging
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCConnection
import net.corda.core.crypto.generateKeyPair
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.FlowLogic
@ -142,8 +143,14 @@ abstract class MQSecurityTest : NodeBasedTest() {
return client
}
fun loginToRPC(target: NetworkHostAndPort, rpcUser: User): CordaRPCOps {
return CordaRPCClient(target).start(rpcUser.username, rpcUser.password).proxy
private val rpcConnections = mutableListOf<CordaRPCConnection>()
private fun loginToRPC(target: NetworkHostAndPort, rpcUser: User): CordaRPCOps {
return CordaRPCClient(target).start(rpcUser.username, rpcUser.password).also { rpcConnections.add(it) }.proxy
}
@After
fun closeRPCConnections() {
rpcConnections.forEach { it.forceClose() }
}
fun loginToRPCAndGetClientQueue(): String {