Merge pull request #162 from corda/aslemmer-fix-netty-buffer-leak

Aslemmer fix netty buffer leak
This commit is contained in:
Andras Slemmer 2017-12-08 10:12:35 +00:00 committed by GitHub
commit c794f8418c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 258 additions and 121 deletions

View File

@ -2,19 +2,14 @@ package net.corda.flowhook
import co.paralleluniverse.fibers.Fiber
import net.corda.core.internal.uncheckedCast
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import java.sql.Connection
import java.time.Instant
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.concurrent.thread
/**
* This is a debugging helper class that dumps the map of Fiber->DB connection, or more precisely, the
* Fiber->(DB tx -> DB connection) map, as there may be multiple transactions per fiber.
*/
data class MonitorEvent(val type: MonitorEventType, val keys: List<Any>, val extra: Any? = null)
@ -34,47 +29,62 @@ enum class MonitorEventType {
FiberResumed,
FiberEnded,
ExecuteTransition
SmExecuteTransition,
SmScheduleEvent,
NettyThreadLocalMapCreated,
SetThreadLocals,
SetInheritableThreadLocals,
GetThreadLocals,
GetInheritableThreadLocals
}
/**
* This is a monitor processing events coming from [FlowHookContainer]. It just appends them to a log that allows
* analysis of the events.
*
* Suggested way of debugging using this class and IntelliJ:
* 1. Hook the function calls you're interested in using [FlowHookContainer].
* 2. Add an associated event type in [MonitorEventType].
* 3. Call [newEvent] in the hook. Provide some keys to allow analysis. Example keys are the current fiber ID or a
* specific DB transaction. You can also provide additional info about the event using [MonitorEvent.extra].
* 4. Run your test and break on [newEvent] or [inspect].
* 5. Inspect the [correlator] in the debugger. E.g. you can add a watch for [MonitorEventCorrelator.getByType].
* You can search for specific objects by using filter expressions in the debugger.
*/
object FiberMonitor {
private val log = contextLogger()
private val jobQueue = LinkedBlockingQueue<Job>()
private val log = loggerFor<FiberMonitor>()
private val started = AtomicBoolean(false)
private var trackerThread: Thread? = null
private var executor: ScheduledExecutorService? = null
val correlator = MonitorEventCorrelator()
sealed class Job {
data class NewEvent(val event: FullMonitorEvent) : Job()
object Finish : Job()
}
fun newEvent(event: MonitorEvent) {
if (trackerThread != null) {
jobQueue.add(Job.NewEvent(FullMonitorEvent(Instant.now(), Exception().stackTrace.toList(), event)))
if (executor != null) {
val fullEvent = FullMonitorEvent(Instant.now(), Exception().stackTrace.toList(), event)
executor!!.execute {
processEvent(fullEvent)
}
}
}
fun start() {
if (started.compareAndSet(false, true)) {
require(trackerThread == null)
trackerThread = thread(name = "Fiber monitor", isDaemon = true) {
while (true) {
val job = jobQueue.poll(1, TimeUnit.SECONDS)
when (job) {
is Job.NewEvent -> processEvent(job)
Job.Finish -> return@thread
}
}
}
require(executor == null)
executor = Executors.newSingleThreadScheduledExecutor()
executor!!.scheduleAtFixedRate(this::inspect, 100, 100, TimeUnit.MILLISECONDS)
}
}
private fun processEvent(job: Job.NewEvent) {
correlator.addEvent(job.event)
checkLeakedTransactions(job.event.event)
checkLeakedConnections(job.event.event)
// Break on this function or [newEvent].
private fun inspect() {
}
private fun processEvent(event: FullMonitorEvent) {
correlator.addEvent(event)
checkLeakedTransactions(event.event)
checkLeakedConnections(event.event)
}
inline fun <reified R, A : Any> R.getField(name: String): A {
@ -124,7 +134,7 @@ object FiberMonitor {
private fun checkLeakedConnections(event: MonitorEvent) {
if (event.type == MonitorEventType.FiberParking) {
val events = correlator.events[event.keys[0]]!!
val events = correlator.merged()[event.keys[0]]!!
val acquiredConnections = events.mapNotNullTo(HashSet()) {
if (it.event.type == MonitorEventType.ConnectionAcquired) {
it.event.keys.mapNotNull { it as? Connection }.first()
@ -147,35 +157,47 @@ object FiberMonitor {
}
}
/**
* This class holds the event log.
*
* Each event has a list of key associated with it. "Relatedness" is then the transitive closure of two events sharing a key.
*
* [merged] returns a map from key to related events. Note that an eventlist may be associated by several keys.
* [getUnique] makes these lists unique by keying on the set of keys associated with the events.
* [getByType] simply groups by the type of the keys. This is probably the most useful "top-level" breakdown of events.
*/
class MonitorEventCorrelator {
private val _events = HashMap<Any, ArrayList<FullMonitorEvent>>()
val events: Map<Any, ArrayList<FullMonitorEvent>> get() = _events
private val events = ArrayList<FullMonitorEvent>()
fun getUnique() = events.values.toSet().associateBy { it.flatMap { it.event.keys }.toSet() }
fun getUnique() = merged().values.toSet().associateBy { it.flatMap { it.event.keys }.toSet() }
fun getByType() = events.entries.groupBy { it.key.javaClass }
fun getByType() = merged().entries.groupBy { it.key.javaClass }
fun addEvent(fullMonitorEvent: FullMonitorEvent) {
val list = link(fullMonitorEvent.event.keys)
list.add(fullMonitorEvent)
for (key in fullMonitorEvent.event.keys) {
_events[key] = list
}
events.add(fullMonitorEvent)
}
fun link(keys: List<Any>): ArrayList<FullMonitorEvent> {
fun merged(): Map<Any, List<FullMonitorEvent>> {
val merged = HashMap<Any, ArrayList<FullMonitorEvent>>()
for (event in events) {
val eventLists = HashSet<ArrayList<FullMonitorEvent>>()
for (key in keys) {
val list = _events[key]
for (key in event.event.keys) {
val list = merged[key]
if (list != null) {
eventLists.add(list)
}
}
return when {
eventLists.isEmpty() -> ArrayList()
eventLists.size == 1 -> eventLists.first()
val newList = when (eventLists.size) {
0 -> ArrayList()
1 -> eventLists.first()
else -> mergeAll(eventLists)
}
newList.add(event)
for (key in event.event.keys) {
merged[key] = newList
}
}
return merged
}
fun mergeAll(lists: Collection<List<FullMonitorEvent>>): ArrayList<FullMonitorEvent> {

View File

@ -1,14 +1,8 @@
package net.corda.flowhook
import co.paralleluniverse.fibers.Fiber
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowFiber
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.transitions.TransitionResult
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager
import rx.subjects.Subject
import java.sql.Connection
@Suppress("UNUSED")
@ -26,6 +20,12 @@ object FlowHookContainer {
FiberMonitor.newEvent(MonitorEvent(MonitorEventType.FiberStarted, keys = listOf(Fiber.currentFiber())))
}
@JvmStatic
@Hook("net.corda.node.services.statemachine.FlowStateMachineImpl", passThis = true)
fun scheduleEvent(fiber: Any, event: Event) {
FiberMonitor.newEvent(MonitorEvent(MonitorEventType.SmScheduleEvent, keys = listOf(fiber), extra = listOf(event, currentFiberOrThread())))
}
@JvmStatic
@Hook("co.paralleluniverse.fibers.Fiber")
fun onCompleted() {
@ -45,13 +45,13 @@ object FlowHookContainer {
}
@JvmStatic
@Hook("net.corda.node.utilities.DatabaseTransaction", passThis = true, position = HookPosition.After)
@Hook("net.corda.nodeapi.internal.persistence.DatabaseTransaction", passThis = true, position = HookPosition.After)
fun DatabaseTransaction(
transaction: Any,
isolation: Int,
threadLocal: ThreadLocal<*>,
transactionBoundaries: Subject<*, *>,
cordaPersistence: CordaPersistence
threadLocal: Any,
transactionBoundaries: Any,
cordaPersistence: Any
) {
val keys = ArrayList<Any>().apply {
add(transaction)
@ -60,9 +60,65 @@ object FlowHookContainer {
FiberMonitor.newEvent(MonitorEvent(MonitorEventType.TransactionCreated, keys = keys))
}
@JvmStatic
@Hook("io.netty.util.internal.InternalThreadLocalMap", passThis = true, position = HookPosition.After)
fun InternalThreadLocalMap(
internalThreadLocalMap: Any
) {
val keys = listOf(
internalThreadLocalMap,
currentFiberOrThread()
)
FiberMonitor.newEvent(MonitorEvent(MonitorEventType.NettyThreadLocalMapCreated, keys = keys))
}
@JvmStatic
@Hook("co.paralleluniverse.concurrent.util.ThreadAccess")
fun setThreadLocals(thread: Thread, threadLocals: Any?) {
FiberMonitor.newEvent(MonitorEvent(
MonitorEventType.SetThreadLocals,
keys = listOf(currentFiberOrThread()),
extra = threadLocals?.let { FiberMonitor.getThreadLocalMapEntryValues(it) }
))
}
@JvmStatic
@Hook("co.paralleluniverse.concurrent.util.ThreadAccess")
fun setInheritableThreadLocals(thread: Thread, threadLocals: Any?) {
FiberMonitor.newEvent(MonitorEvent(
MonitorEventType.SetInheritableThreadLocals,
keys = listOf(currentFiberOrThread()),
extra = threadLocals?.let { FiberMonitor.getThreadLocalMapEntryValues(it) }
))
}
@JvmStatic
@Hook("co.paralleluniverse.concurrent.util.ThreadAccess")
fun getThreadLocals(thread: Thread): (threadLocals: Any?) -> Unit {
return { threadLocals ->
FiberMonitor.newEvent(MonitorEvent(
MonitorEventType.GetThreadLocals,
keys = listOf(currentFiberOrThread()),
extra = threadLocals?.let { FiberMonitor.getThreadLocalMapEntryValues(it) }
))
}
}
@JvmStatic
@Hook("co.paralleluniverse.concurrent.util.ThreadAccess")
fun getInheritableThreadLocals(thread: Thread): (threadLocals: Any?) -> Unit {
return { threadLocals ->
FiberMonitor.newEvent(MonitorEvent(
MonitorEventType.GetInheritableThreadLocals,
keys = listOf(currentFiberOrThread()),
extra = threadLocals?.let { FiberMonitor.getThreadLocalMapEntryValues(it) }
))
}
}
@JvmStatic
@Hook("com.zaxxer.hikari.HikariDataSource")
fun getConnection(): (Connection) -> Unit {
fun getConnection(): (Any) -> Unit {
val transactionOrThread = currentTransactionOrThread()
FiberMonitor.newEvent(MonitorEvent(MonitorEventType.ConnectionRequested, keys = listOf(transactionOrThread)))
return { connection ->
@ -81,19 +137,23 @@ object FlowHookContainer {
@JvmStatic
@Hook("net.corda.node.services.statemachine.TransitionExecutorImpl")
fun executeTransition(
fiber: FlowFiber,
previousState: StateMachineState,
event: Event,
transition: TransitionResult,
actionExecutor: ActionExecutor
fiber: Any,
previousState: Any,
event: Any,
transition: Any,
actionExecutor: Any
) {
FiberMonitor.newEvent(MonitorEvent(MonitorEventType.ExecuteTransition, keys = listOf(fiber), extra = object {
FiberMonitor.newEvent(MonitorEvent(MonitorEventType.SmExecuteTransition, keys = listOf(fiber), extra = object {
val previousState = previousState
val event = event
val transition = transition
}))
}
private fun currentFiberOrThread(): Any {
return Fiber.currentFiber() ?: Thread.currentThread()
}
private fun currentTransactionOrThread(): Any {
return try {
DatabaseTransactionManager.currentOrNull()

View File

@ -56,28 +56,36 @@ class Hooker(hookContainer: Any) : ClassFileTransformer {
private fun instrumentClass(clazz: CtClass): CtClass? {
val hookMethods = hooks[clazz.name] ?: return null
val usedHookMethods = HashSet<Method>()
var isAnyInstrumented = false
for (method in clazz.declaredBehaviors) {
val hookMethod = instrumentBehaviour(method, hookMethods)
if (hookMethod != null) {
isAnyInstrumented = true
usedHookMethods.add(hookMethod)
}
usedHookMethods.addAll(instrumentBehaviour(method, hookMethods))
}
val unusedHookMethods = hookMethods.values.mapTo(HashSet()) { it.first } - usedHookMethods
if (usedHookMethods.isNotEmpty()) {
println("Hooked methods $usedHookMethods")
}
if (unusedHookMethods.isNotEmpty()) {
println("Unused hook methods $unusedHookMethods")
}
return if (isAnyInstrumented) {
return if (usedHookMethods.isNotEmpty()) {
clazz
} else {
null
}
}
private fun instrumentBehaviour(method: CtBehavior, methodHooks: MethodHooks): Method? {
val signature = Signature(method.name, method.parameterTypes.map { it.name })
val (hookMethod, annotation) = methodHooks[signature] ?: return null
private val objectName = Any::class.java.name
private fun instrumentBehaviour(method: CtBehavior, methodHooks: MethodHooks): List<Method> {
val pairs = methodHooks.mapNotNull { (signature, pair) ->
if (signature.functionName != method.name) return@mapNotNull null
if (signature.parameterTypes.size != method.parameterTypes.size) return@mapNotNull null
for (i in 0 until signature.parameterTypes.size) {
if (signature.parameterTypes[i] != objectName && signature.parameterTypes[i] != method.parameterTypes[i].name) {
return@mapNotNull null
}
}
pair
}
for ((hookMethod, annotation) in pairs) {
val invocationString = if (annotation.passThis) {
"${hookMethod.declaringClass.canonicalName}.${hookMethod.name}(this, \$\$)"
} else {
@ -99,19 +107,32 @@ class Hooker(hookContainer: Any) : ClassFileTransformer {
when {
Function0::class.java.isAssignableFrom(hookMethod.returnType) -> {
method.addLocalVariable("after", classPool.get("kotlin.jvm.functions.Function0"))
method.insertHook("after = $invocationString;")
method.insertAfter("after.invoke();")
method.insertHook("after = null; ${wrapTryCatch("after = $invocationString;")}")
method.insertAfter("if (after != null) ${wrapTryCatch("after.invoke();")}")
}
Function1::class.java.isAssignableFrom(hookMethod.returnType) -> {
method.addLocalVariable("after", classPool.get("kotlin.jvm.functions.Function1"))
method.insertHook("after = $invocationString;")
method.insertAfter("after.invoke((\$w)\$_);")
method.insertHook("after = null; ${wrapTryCatch("after = $invocationString;")}")
method.insertAfter("if (after != null) ${wrapTryCatch("after.invoke((\$w)\$_);")}")
}
else -> {
method.insertHook("$invocationString;")
method.insertHook(wrapTryCatch("$invocationString;"))
}
}
return hookMethod
}
return pairs.map { it.first }
}
companion object {
fun wrapTryCatch(statement: String): String {
return "try { $statement } catch (Throwable throwable) { ${Hooker::class.java.canonicalName}.${Hooker.Companion::exceptionInHook.name}(throwable); }"
}
@JvmStatic
fun exceptionInHook(throwable: Throwable) {
throwable.printStackTrace()
}
}
}
@ -122,7 +143,11 @@ enum class HookPosition {
}
@Target(AnnotationTarget.FUNCTION)
annotation class Hook(val clazz: String, val position: HookPosition = HookPosition.Before, val passThis: Boolean = false)
annotation class Hook(
val clazz: String,
val position: HookPosition = HookPosition.Before,
val passThis: Boolean = false
)
private data class Signature(val functionName: String, val parameterTypes: List<String>)

View File

@ -17,6 +17,7 @@ import net.corda.nodeapi.internal.config.User
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.*
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.PortAllocation
import net.corda.testing.driver.driver
import net.corda.testing.internal.performance.div
import net.corda.testing.internal.performance.startPublishingFixedRateInjector
@ -128,21 +129,40 @@ class NodePerformanceTests : IntegrationTest() {
driver(
notarySpecs = listOf(NotarySpec(DUMMY_NOTARY.name, rpcUsers = listOf(user))),
startNodesInProcess = true,
extraCordappPackagesToScan = listOf("net.corda.finance")
extraCordappPackagesToScan = listOf("net.corda.finance"),
portAllocation = PortAllocation.Incremental(20000)
) {
val notary = defaultNotaryNode.getOrThrow() as NodeHandle.InProcess
val metricRegistry = startReporter(shutdownManager, notary.node.services.monitoringService.metrics)
notary.rpcClientToNode().use("A", "A") { connection ->
println("ISSUING")
val doneFutures = (1..100).toList().parallelStream().map {
val doneFutures = (1..100).toList().map {
connection.proxy.startFlow(::CashIssueFlow, 1.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue
}.toList()
doneFutures.transpose().get()
println("STARTING PAYMENT")
startPublishingFixedRateInjector(metricRegistry, 8, 5.minutes, 100L / TimeUnit.SECONDS) {
startPublishingFixedRateInjector(metricRegistry, 8, 5.minutes, 5L / TimeUnit.SECONDS) {
connection.proxy.startFlow(::CashPaymentFlow, 1.DOLLARS, defaultNotaryIdentity).returnValue.get()
}
}
}
}
@Test
fun `single pay`() {
val user = User("A", "A", setOf(startFlow<CashIssueFlow>(), startFlow<CashPaymentFlow>()))
driver(
notarySpecs = listOf(NotarySpec(DUMMY_NOTARY.name, rpcUsers = listOf(user))),
startNodesInProcess = true,
extraCordappPackagesToScan = listOf("net.corda.finance"),
portAllocation = PortAllocation.Incremental(20000)
) {
val notary = defaultNotaryNode.getOrThrow() as NodeHandle.InProcess
val metricRegistry = startReporter(shutdownManager, notary.node.services.monitoringService.metrics)
notary.rpcClientToNode().use("A", "A") { connection ->
connection.proxy.startFlow(::CashIssueFlow, 1.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.getOrThrow()
connection.proxy.startFlow(::CashPaymentFlow, 1.DOLLARS, defaultNotaryIdentity).returnValue.getOrThrow()
}
}
}
}

View File

@ -294,12 +294,14 @@ class P2PMessagingClient(config: NodeConfiguration,
// processing a message but if so, it'll be parked waiting for us to count down the latch, so
// the session itself is still around and we can still ack messages as a result.
override fun acknowledge() {
messagingExecutor.fetchFrom {
state.locked {
artemisMessage.individualAcknowledge()
artemis.started!!.session.commit()
}
}
}
}
deliverTo(msg, HandlerRegistration(msg.topic, deliverTo), acknowledgeHandle)
} else {
log.warn("Received message ${msg.uniqueMessageId} for ${msg.topic} that doesn't have any registered handlers yet")

View File

@ -6,6 +6,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
/**
@ -28,13 +29,16 @@ class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : Transiti
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> {
val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
val transitionRecord = TransitionDiagnosticRecord(fiber.id, previousState, nextState, event, transition, continuation)
val transitionRecord = TransitionDiagnosticRecord(Instant.now(), fiber.id, previousState, nextState, event, transition, continuation)
val record = records.compute(fiber.id) { _, record ->
(record ?: ArrayList()).apply { add(transitionRecord) }
}
if (nextState.checkpoint.errorState is ErrorState.Errored) {
log.warn("Flow ${fiber.id} dirtied, dumping all transitions:\n${record!!.joinToString("\n")}")
for (error in nextState.checkpoint.errorState.errors) {
log.warn("Flow ${fiber.id} error", error.exception)
}
}
if (transition.newState.isRemoved) {

View File

@ -5,6 +5,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.time.Instant
/**
* This interceptor simply prints all state machine transitions. Useful for debugging.
@ -23,7 +24,7 @@ class PrintingInterceptor(val delegate: TransitionExecutor) : TransitionExecutor
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> {
val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
val transitionRecord = TransitionDiagnosticRecord(fiber.id, previousState, nextState, event, transition, continuation)
val transitionRecord = TransitionDiagnosticRecord(Instant.now(), fiber.id, previousState, nextState, event, transition, continuation)
log.info("Transition for flow ${fiber.id} $transitionRecord")
return Pair(continuation, nextState)
}

View File

@ -6,12 +6,14 @@ import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.transitions.TransitionResult
import net.corda.node.utilities.ObjectDiffer
import java.time.Instant
/**
* This is a diagnostic record that stores information about a state machine transition and provides pretty printing
* by diffing the two states.
*/
data class TransitionDiagnosticRecord(
val timestamp: Instant,
val flowId: StateMachineRunId,
val previousState: StateMachineState,
val nextState: StateMachineState,
@ -26,6 +28,7 @@ data class TransitionDiagnosticRecord(
listOf(
"",
" --- Transition of flow $flowId ---",
" Timestamp: $timestamp",
" Event: $event",
" Actions: ",
" ${transition.actions.joinToString("\n ")}",