RPC: allow trackRpcCallSites to be set from the command line. Add logging.

This commit is contained in:
Mike Hearn 2018-08-27 21:04:44 +02:00
parent f856a77c96
commit 0f8a6e44ea
4 changed files with 39 additions and 20 deletions

View File

@ -43,7 +43,7 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
* returned Observable stream the stack trace of the originating RPC will be shown as well. Note that
* constructing call stacks is a moderately expensive operation.
*/
open val trackRpcCallSites: Boolean = false,
open val trackRpcCallSites: Boolean = java.lang.Boolean.getBoolean("net.corda.client.rpc.trackRpcCallSites"),
/**
* The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references

View File

@ -97,12 +97,18 @@ class RPCClientProxyHandler(
// To check whether toString() is being invoked
val toStringMethod: Method = Object::toString.javaMethod!!
private fun addRpcCallSiteToThrowable(throwable: Throwable, callSite: Throwable) {
private fun addRpcCallSiteToThrowable(throwable: Throwable, callSite: CallSite) {
var currentThrowable = throwable
while (true) {
val cause = currentThrowable.cause
if (cause == null) {
currentThrowable.initCause(callSite)
try {
currentThrowable.initCause(callSite)
} catch (e: IllegalStateException) {
// OK, we did our best, but the first throwable with a null cause was instantiated using
// Throwable(Throwable) or Throwable(String, Throwable) which means initCause can't ever
// be called even if it was passed null.
}
break
} else {
currentThrowable = cause
@ -146,15 +152,17 @@ class RPCClientProxyHandler(
private fun createRpcObservableMap(): RpcObservableMap {
val onObservableRemove = RemovalListener<InvocationId, UnicastSubject<Notification<*>>> { key, _, cause ->
val observableId = key!!
val rpcCallSite = callSiteMap?.remove(observableId)
val rpcCallSite: CallSite? = callSiteMap?.remove(observableId)
if (cause == RemovalCause.COLLECTED) {
log.warn(listOf(
"A hot observable returned from an RPC was never subscribed to.",
"This wastes server-side resources because it was queueing observations for retrieval.",
"It is being closed now, but please adjust your code to call .notUsed() on the observable",
"to close it explicitly. (Java users: subscribe to it then unsubscribe). This warning",
"will appear less frequently in future versions of the platform and you can ignore it",
"if you want to.").joinToString(" "), rpcCallSite)
"to close it explicitly. (Java users: subscribe to it then unsubscribe). If you aren't sure",
"where the leak is coming from, set -Dnet.corda.client.rpc.trackRpcCallSites=true on the JVM",
"command line and you will get a stack trace with this warning."
).joinToString(" "), rpcCallSite)
rpcCallSite?.printStackTrace()
}
observablesToReap.locked { observables.add(observableId) }
}
@ -215,6 +223,9 @@ class RPCClientProxyHandler(
startSessions()
}
/** A throwable that doesn't represent a real error - it's just here to wrap a stack trace. */
class CallSite(val rpcName: String) : Throwable("<Call site of root RPC '$rpcName'>")
// This is the general function that transforms a client side RPC to internal Artemis messages.
override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?): Any? {
lifeCycle.requireState { it == State.STARTED || it == State.SERVER_VERSION_NOT_SET }
@ -230,7 +241,7 @@ class RPCClientProxyHandler(
throw RPCException("RPC server is not available.")
val replyId = InvocationId.newInstance()
callSiteMap?.set(replyId, Throwable("<Call site of root RPC '${method.name}'>"))
callSiteMap?.set(replyId, CallSite(method.name))
try {
val serialisedArguments = (arguments?.toList() ?: emptyList()).serialize(context = serializationContextWithObservableContext)
val request = RPCApi.ClientToServer.RpcRequest(
@ -273,7 +284,7 @@ class RPCClientProxyHandler(
// The handler for Artemis messages.
private fun artemisMessageHandler(message: ClientMessage) {
fun completeExceptionally(id: InvocationId, e: Throwable, future: SettableFuture<Any?>?) {
val rpcCallSite: Throwable? = callSiteMap?.get(id)
val rpcCallSite: CallSite? = callSiteMap?.get(id)
if (rpcCallSite != null) addRpcCallSiteToThrowable(e, rpcCallSite)
future?.setException(e.cause ?: e)
}
@ -555,13 +566,14 @@ class RPCClientProxyHandler(
private typealias RpcObservableMap = Cache<InvocationId, UnicastSubject<Notification<*>>>
private typealias RpcReplyMap = ConcurrentHashMap<InvocationId, SettableFuture<Any?>>
private typealias CallSiteMap = ConcurrentHashMap<InvocationId, Throwable?>
private typealias CallSiteMap = ConcurrentHashMap<InvocationId, RPCClientProxyHandler.CallSite?>
/**
* Holds a context available during de-serialisation of messages that are expected to contain Observables.
*
* @param observableMap holds the Observables that are ultimately exposed to the user.
* @param hardReferenceStore holds references to Observables we want to keep alive while they are subscribed to.
* @property observableMap holds the Observables that are ultimately exposed to the user.
* @property hardReferenceStore holds references to Observables we want to keep alive while they are subscribed to.
* @property callSiteMap keeps stack traces captured when an RPC was invoked, useful for debugging when an observable leaks.
*/
data class ObservableContext(
val callSiteMap: CallSiteMap?,

View File

@ -2,8 +2,10 @@ package net.corda.client.rpc.internal.serialization.amqp
import net.corda.client.rpc.internal.ObservableContext
import net.corda.client.rpc.internal.RPCClientProxyHandler
import net.corda.core.context.Trace
import net.corda.core.serialization.SerializationContext
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.RPCApi
import net.corda.serialization.internal.amqp.*
import org.apache.qpid.proton.codec.Data
@ -17,11 +19,12 @@ import java.util.concurrent.atomic.AtomicInteger
import javax.transaction.NotSupportedException
/**
* De-serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects,
* just as the corresponding RPC server side code ([RpcServerObservableSerializer]) can only serialize them. Observables are only notionally serialized,
* what is actually sent is a reference to the observable that can then be subscribed to.
* De-serializer for Rx [Observable] instances for the RPC Client library. Can only be used to deserialize such objects,
* just as the corresponding RPC server side class [RpcServerObservableSerializer] can only serialize them. Observables
* are only notionally serialized, what is actually sent is a reference to the observable that can then be subscribed to.
*/
object RpcClientObservableDeSerializer : CustomSerializer.Implements<Observable<*>>(Observable::class.java) {
private val log = loggerFor<RpcClientObservableDeSerializer>()
private object RpcObservableContextKey
fun createContext(
@ -96,22 +99,23 @@ object RpcClientObservableDeSerializer : CustomSerializer.Implements<Observable<
}
val rpcCallSite = getRpcCallSite(context, observableContext)
observableContext.observableMap.put(observableId, observable)
observableContext.callSiteMap?.put(observableId, rpcCallSite)
log.trace("Deserialising observable $observableId", rpcCallSite)
// We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users
// don't need to store a reference to the Observables themselves.
return pinInSubscriptions(observable, observableContext.hardReferenceStore).doOnUnsubscribe {
// This causes Future completions to give warnings because the corresponding OnComplete sent from the server
// will arrive after the client unsubscribes from the observable and consequently invalidates the mapping.
// The unsubscribe is due to [ObservableToFuture]'s use of first().
// The unsubscribe is due to ObservableToFuture's use of first().
observableContext.observableMap.invalidate(observableId)
}.dematerialize<Any>()
}
private fun getRpcCallSite(context: SerializationContext, observableContext: ObservableContext): Throwable? {
private fun getRpcCallSite(context: SerializationContext, observableContext: ObservableContext): RPCClientProxyHandler.CallSite? {
val rpcRequestOrObservableId = context.properties[RPCApi.RpcRequestOrObservableIdKey] as Trace.InvocationId
// Will only return non-null if the trackRpcCallSites option in the RPC configuration has been specified.
return observableContext.callSiteMap?.get(rpcRequestOrObservableId)
}

View File

@ -2,6 +2,7 @@ package net.corda.node.serialization.amqp
import net.corda.core.context.Trace
import net.corda.core.serialization.SerializationContext
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.loggerFor
import net.corda.node.services.messaging.ObservableContextInterface
import net.corda.node.services.messaging.ObservableSubscription
@ -30,8 +31,9 @@ class RpcServerObservableSerializer : CustomSerializer.Implements<Observable<*>>
fun createContext(
serializationContext: SerializationContext,
observableContext: ObservableContextInterface
) = serializationContext.withProperty(
RpcServerObservableSerializer.RpcObservableContextKey, observableContext)
) = serializationContext.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext)
val log = contextLogger()
}
override val schemaForDocumentation = Schema(
@ -136,5 +138,6 @@ class RpcServerObservableSerializer : CustomSerializer.Implements<Observable<*>>
}
}
observableContext.observableMap.put(observableId, observableWithSubscription)
log.trace("Serialized observable $observableId of type $obj")
}
}