RPC: call close() on startup failure, add thread leak tests

This commit is contained in:
Andras Slemmer
2017-05-10 15:30:36 +01:00
parent 5f5f51bf51
commit 7c3a566197
12 changed files with 242 additions and 176 deletions

View File

@ -5,28 +5,100 @@ import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.pool.KryoPool
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.ErrorOr
import net.corda.core.getOrThrow
import net.corda.core.messaging.RPCOps
import net.corda.core.millis
import net.corda.core.random63BitValue
import net.corda.node.driver.poll
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.RPCKryo
import net.corda.testing.*
import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.Assert.assertEquals
import org.junit.Test
import rx.Observable
import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject
import java.time.Duration
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
class RPCStabilityTests {
object DummyOps : RPCOps {
override val protocolVersion = 0
}
private fun waitUntilNumberOfThreadsStable(executorService: ScheduledExecutorService): Int {
val values = ConcurrentLinkedQueue<Int>()
return poll(executorService, "number of threads to become stable", 250.millis) {
values.add(Thread.activeCount())
if (values.size > 5) {
values.poll()
}
val first = values.peek()
if (values.size == 5 && values.all { it == first } ) {
first
} else {
null
}
}.get()
}
@Test
fun `client and server dont leak threads`() {
val executor = Executors.newScheduledThreadPool(1)
fun startAndStop() {
rpcDriver {
val server = startRpcServer<RPCOps>(ops = DummyOps)
startRpcClient<RPCOps>(server.get().hostAndPort).get()
}
}
for (i in 1 .. 5) {
startAndStop()
}
val numberOfThreadsBefore = waitUntilNumberOfThreadsStable(executor)
for (i in 1 .. 5) {
startAndStop()
}
val numberOfThreadsAfter = waitUntilNumberOfThreadsStable(executor)
// This is a less than check because threads from other tests may be shutting down while this test is running.
// This is therefore a "best effort" check. When this test is run on its own this should be a strict equality.
require(numberOfThreadsBefore >= numberOfThreadsAfter)
executor.shutdownNow()
}
@Test
fun `client doesnt leak threads when it fails to start`() {
val executor = Executors.newScheduledThreadPool(1)
fun startAndStop() {
rpcDriver {
ErrorOr.catch { startRpcClient<RPCOps>(HostAndPort.fromString("localhost:9999")).get() }
val server = startRpcServer<RPCOps>(ops = DummyOps)
ErrorOr.catch { startRpcClient<RPCOps>(server.get().hostAndPort, configuration = RPCClientConfiguration.default.copy(minimumServerProtocolVersion = 1)).get() }
}
}
for (i in 1 .. 5) {
startAndStop()
}
val numberOfThreadsBefore = waitUntilNumberOfThreadsStable(executor)
for (i in 1 .. 5) {
startAndStop()
}
val numberOfThreadsAfter = waitUntilNumberOfThreadsStable(executor)
require(numberOfThreadsBefore >= numberOfThreadsAfter)
executor.shutdownNow()
}
interface LeakObservableOps: RPCOps {
fun leakObservable(): Observable<Nothing>
}

View File

@ -147,26 +147,32 @@ class RPCClient<I : RPCOps>(
}
val proxyHandler = RPCClientProxyHandler(rpcConfiguration, username, password, serverLocator, clientAddress, rpcOpsClass)
proxyHandler.start()
try {
proxyHandler.start()
@Suppress("UNCHECKED_CAST")
val ops = Proxy.newProxyInstance(rpcOpsClass.classLoader, arrayOf(rpcOpsClass), proxyHandler) as I
@Suppress("UNCHECKED_CAST")
val ops = Proxy.newProxyInstance(rpcOpsClass.classLoader, arrayOf(rpcOpsClass), proxyHandler) as I
val serverProtocolVersion = ops.protocolVersion
if (serverProtocolVersion < rpcConfiguration.minimumServerProtocolVersion) {
throw RPCException("Requested minimum protocol version (${rpcConfiguration.minimumServerProtocolVersion}) is higher" +
" than the server's supported protocol version ($serverProtocolVersion)")
}
proxyHandler.setServerProtocolVersion(serverProtocolVersion)
log.debug("RPC connected, returning proxy")
object : RPCConnection<I> {
override val proxy = ops
override val serverProtocolVersion = serverProtocolVersion
override fun close() {
proxyHandler.close()
serverLocator.close()
val serverProtocolVersion = ops.protocolVersion
if (serverProtocolVersion < rpcConfiguration.minimumServerProtocolVersion) {
throw RPCException("Requested minimum protocol version (${rpcConfiguration.minimumServerProtocolVersion}) is higher" +
" than the server's supported protocol version ($serverProtocolVersion)")
}
proxyHandler.setServerProtocolVersion(serverProtocolVersion)
log.debug("RPC connected, returning proxy")
object : RPCConnection<I> {
override val proxy = ops
override val serverProtocolVersion = serverProtocolVersion
override fun close() {
proxyHandler.close()
serverLocator.close()
}
}
} catch (exception: Throwable) {
proxyHandler.close()
serverLocator.close()
throw exception
}
}
}

View File

@ -25,16 +25,11 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator
import rx.Notification
import rx.Observable
import rx.subjects.UnicastSubject
import sun.reflect.CallerSensitive
import java.lang.reflect.InvocationHandler
import java.lang.reflect.Method
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.collections.ArrayList
import kotlin.reflect.jvm.javaMethod
/**
@ -87,10 +82,7 @@ class RPCClientProxyHandler(
}
// Used for reaping
private val reaperExecutor = Executors.newScheduledThreadPool(
1,
ThreadFactoryBuilder().setNameFormat("rpc-client-reaper-%d").build()
)
private var reaperExecutor: ScheduledExecutorService? = null
// A sticky pool for running Observable.onNext()s. We need the stickiness to preserve the observation ordering.
private val observationExecutorThreadFactory = ThreadFactoryBuilder().setNameFormat("rpc-client-observation-pool-%d").build()
@ -109,7 +101,7 @@ class RPCClientProxyHandler(
hardReferenceStore = Collections.synchronizedSet(mutableSetOf<Observable<*>>())
)
// Holds a reference to the scheduled reaper.
private lateinit var reaperScheduledFuture: ScheduledFuture<*>
private var reaperScheduledFuture: ScheduledFuture<*>? = null
// The protocol version of the server, to be initialised to the value of [RPCOps.protocolVersion]
private var serverProtocolVersion: Int? = null
@ -145,7 +137,7 @@ class RPCClientProxyHandler(
// TODO We may need to pool these somehow anyway, otherwise if the server sends many big messages in parallel a
// single consumer may be starved for flow control credits. Recheck this once Artemis's large message streaming is
// integrated properly.
private lateinit var sessionAndConsumer: ArtemisConsumer
private var sessionAndConsumer: ArtemisConsumer? = null
// Pool producers to reduce contention on the client side.
private val sessionAndProducerPool = LazyPool(bound = rpcConfiguration.producerPoolBound) {
// Note how we create new sessions *and* session factories per producer.
@ -162,7 +154,12 @@ class RPCClientProxyHandler(
* Start the client. This creates the per-client queue, starts the consumer session and the reaper.
*/
fun start() {
reaperScheduledFuture = reaperExecutor.scheduleAtFixedRate(
lifeCycle.requireState(State.UNSTARTED)
reaperExecutor = Executors.newScheduledThreadPool(
1,
ThreadFactoryBuilder().setNameFormat("rpc-client-reaper-%d").build()
)
reaperScheduledFuture = reaperExecutor!!.scheduleAtFixedRate(
this::reapObservables,
rpcConfiguration.reapInterval.toMillis(),
rpcConfiguration.reapInterval.toMillis(),
@ -187,7 +184,7 @@ class RPCClientProxyHandler(
if (method == toStringMethod) {
return "Client RPC proxy for $rpcOpsClass"
}
if (sessionAndConsumer.session.isClosed) {
if (sessionAndConsumer!!.session.isClosed) {
throw RPCException("RPC Proxy is closed")
}
val rpcId = RPCApi.RpcRequestId(random63BitValue())
@ -268,13 +265,13 @@ class RPCClientProxyHandler(
* Closes the RPC proxy. Reaps all observables, shuts down the reaper, closes all sessions and executors.
*/
fun close() {
sessionAndConsumer.consumer.close()
sessionAndConsumer.session.close()
sessionAndConsumer.sessionFactory.close()
reaperScheduledFuture.cancel(false)
sessionAndConsumer?.consumer?.close()
sessionAndConsumer?.session?.close()
sessionAndConsumer?.sessionFactory?.close()
reaperScheduledFuture?.cancel(false)
observableContext.observableMap.invalidateAll()
reapObservables()
reaperExecutor.shutdownNow()
reaperExecutor?.shutdownNow()
sessionAndProducerPool.close().forEach {
it.producer.close()
it.session.close()
@ -284,8 +281,7 @@ class RPCClientProxyHandler(
// leak borrowed executors.
val observationExecutors = observationExecutorPool.close()
observationExecutors.forEach { it.shutdownNow() }
observationExecutors.forEach { it.awaitTermination(100, TimeUnit.MILLISECONDS) }
lifeCycle.transition(State.STARTED, State.FINISHED)
lifeCycle.transition(State.FINISHED)
}
/**