#592: Address more comments

This commit is contained in:
Andras Slemmer 2017-05-04 13:41:28 +01:00
parent de88ad4f40
commit 34517f653a
8 changed files with 101 additions and 71 deletions

View File

@ -32,11 +32,11 @@ data class RPCClientConfiguration(
*/ */
val trackRpcCallSites: Boolean, val trackRpcCallSites: Boolean,
/** /**
* The interval of unused observable reaping in milliseconds. Leaked Observables (unused ones) are * The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references
* detected using weak references and are cleaned up in batches in this interval. If set too large it will waste * and are cleaned up in batches in this interval. If set too large it will waste server side resources for this
* server side resources for this duration. If set too low it wastes client side cycles. * duration. If set too low it wastes client side cycles.
*/ */
val reapIntervalMs: Long, val reapInterval: Duration,
/** The number of threads to use for observations (for executing [Observable.onNext]) */ /** The number of threads to use for observations (for executing [Observable.onNext]) */
val observationExecutorPoolSize: Int, val observationExecutorPoolSize: Int,
/** The maximum number of producers to create to handle outgoing messages */ /** The maximum number of producers to create to handle outgoing messages */
@ -61,7 +61,7 @@ data class RPCClientConfiguration(
val default = RPCClientConfiguration( val default = RPCClientConfiguration(
minimumServerProtocolVersion = 0, minimumServerProtocolVersion = 0,
trackRpcCallSites = false, trackRpcCallSites = false,
reapIntervalMs = 1000, reapInterval = 1.seconds,
observationExecutorPoolSize = 4, observationExecutorPoolSize = 4,
producerPoolBound = 1, producerPoolBound = 1,
cacheConcurrencyLevel = 8, cacheConcurrencyLevel = 8,

View File

@ -165,8 +165,8 @@ class RPCClientProxyHandler(
lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET) lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET)
reaperScheduledFuture = reaperExecutor.scheduleAtFixedRate( reaperScheduledFuture = reaperExecutor.scheduleAtFixedRate(
this::reapObservables, this::reapObservables,
rpcConfiguration.reapIntervalMs, rpcConfiguration.reapInterval.toMillis(),
rpcConfiguration.reapIntervalMs, rpcConfiguration.reapInterval.toMillis(),
TimeUnit.MILLISECONDS TimeUnit.MILLISECONDS
) )
sessionAndProducerPool.run { sessionAndProducerPool.run {

View File

@ -1,34 +1,22 @@
package net.corda.client.rpc package net.corda.client.rpc
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.future import net.corda.core.future
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.millis
import net.corda.core.random63BitValue import net.corda.core.random63BitValue
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.loggerFor
import net.corda.node.driver.poll
import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.testing.RPCDriverExposedDSLInterface import net.corda.testing.RPCDriverExposedDSLInterface
import net.corda.testing.rpcDriver import net.corda.testing.rpcDriver
import net.corda.testing.startRandomRpcClient
import net.corda.testing.startRpcClient
import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.Ignore
import org.junit.Test import org.junit.Test
import org.junit.runner.RunWith import org.junit.runner.RunWith
import org.junit.runners.Parameterized import org.junit.runners.Parameterized
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject import rx.subjects.UnicastSubject
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
@RunWith(Parameterized::class) @RunWith(Parameterized::class)
class RPCConcurrencyTests : AbstractRPCTest() { class RPCConcurrencyTests : AbstractRPCTest() {
@ -98,7 +86,7 @@ class RPCConcurrencyTests : AbstractRPCTest() {
return testProxy<TestOps>( return testProxy<TestOps>(
testOpsImpl, testOpsImpl,
clientConfiguration = RPCClientConfiguration.default.copy( clientConfiguration = RPCClientConfiguration.default.copy(
reapIntervalMs = 100, reapInterval = 100.millis,
cacheConcurrencyLevel = 16 cacheConcurrencyLevel = 16
), ),
serverConfiguration = RPCServerConfiguration.default.copy( serverConfiguration = RPCServerConfiguration.default.copy(

View File

@ -1,31 +1,25 @@
package net.corda.client.rpc package net.corda.client.rpc
import com.codahale.metrics.ConsoleReporter
import com.codahale.metrics.Gauge import com.codahale.metrics.Gauge
import com.codahale.metrics.JmxReporter import com.codahale.metrics.JmxReporter
import com.codahale.metrics.MetricRegistry import com.codahale.metrics.MetricRegistry
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.pool.KryoPool
import com.google.common.base.Stopwatch import com.google.common.base.Stopwatch
import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.millis import net.corda.core.minutes
import net.corda.core.random63BitValue import net.corda.core.seconds
import net.corda.core.utilities.Rate
import net.corda.core.utilities.div
import net.corda.node.driver.ShutdownManager import net.corda.node.driver.ShutdownManager
import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.RPCKryo
import net.corda.testing.RPCDriverExposedDSLInterface import net.corda.testing.RPCDriverExposedDSLInterface
import net.corda.testing.measure import net.corda.testing.measure
import net.corda.testing.rpcDriver import net.corda.testing.rpcDriver
import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.Ignore import org.junit.Ignore
import org.junit.Test import org.junit.Test
import org.junit.runner.RunWith import org.junit.runner.RunWith
import org.junit.runners.Parameterized import org.junit.runners.Parameterized
import rx.Observable
import java.time.Duration import java.time.Duration
import java.util.* import java.util.*
import java.util.concurrent.* import java.util.concurrent.*
@ -140,23 +134,24 @@ class RPCPerformanceTests : AbstractRPCTest() {
@Test @Test
fun `consumption rate`() { fun `consumption rate`() {
rpcDriver { rpcDriver {
val metricRegistry = startJmxReporter() val metricRegistry = startReporter()
val proxy = testProxy( val proxy = testProxy(
RPCClientConfiguration.default.copy( RPCClientConfiguration.default.copy(
reapIntervalMs = 100, reapInterval = 1.seconds,
cacheConcurrencyLevel = 16 cacheConcurrencyLevel = 16,
producerPoolBound = 8
), ),
RPCServerConfiguration.default.copy( RPCServerConfiguration.default.copy(
rpcThreadPoolSize = 4, rpcThreadPoolSize = 8,
consumerPoolSize = 4, consumerPoolSize = 1,
producerPoolBound = 4 producerPoolBound = 8
) )
) )
measurePerformancePublishMetrics( measurePerformancePublishMetrics(
metricRegistry = metricRegistry, metricRegistry = metricRegistry,
parallelism = 4, parallelism = 8,
overallDurationSecond = 120.0, overallDuration = 5.minutes,
injectionRatePerSecond = 20000.0, injectionRate = 20000L / TimeUnit.SECONDS,
queueSizeMetricName = "$mode.QueueSize", queueSizeMetricName = "$mode.QueueSize",
workDurationMetricName = "$mode.WorkDuration", workDurationMetricName = "$mode.WorkDuration",
shutdownManager = this.shutdownManager, shutdownManager = this.shutdownManager,
@ -205,8 +200,8 @@ class RPCPerformanceTests : AbstractRPCTest() {
fun measurePerformancePublishMetrics( fun measurePerformancePublishMetrics(
metricRegistry: MetricRegistry, metricRegistry: MetricRegistry,
parallelism: Int, parallelism: Int,
overallDurationSecond: Double, overallDuration: Duration,
injectionRatePerSecond: Double, injectionRate: Rate,
queueSizeMetricName: String, queueSizeMetricName: String,
workDurationMetricName: String, workDurationMetricName: String,
shutdownManager: ShutdownManager, shutdownManager: ShutdownManager,
@ -238,7 +233,7 @@ fun measurePerformancePublishMetrics(
} }
val injector = executor.scheduleAtFixedRate( val injector = executor.scheduleAtFixedRate(
{ {
workSemaphore.release(injectionRatePerSecond.toInt()) workSemaphore.release((injectionRate * TimeUnit.SECONDS).toInt())
}, },
0, 0,
1, 1,
@ -251,7 +246,7 @@ fun measurePerformancePublishMetrics(
workExecutor.awaitTermination(1, TimeUnit.SECONDS) workExecutor.awaitTermination(1, TimeUnit.SECONDS)
executor.awaitTermination(1, TimeUnit.SECONDS) executor.awaitTermination(1, TimeUnit.SECONDS)
} }
Thread.sleep((overallDurationSecond * 1000).toLong()) Thread.sleep(overallDuration.toMillis())
} }
fun startInjectorWithBoundedQueue( fun startInjectorWithBoundedQueue(
@ -289,7 +284,7 @@ fun startInjectorWithBoundedQueue(
injector.join() injector.join()
} }
fun RPCDriverExposedDSLInterface.startJmxReporter(): MetricRegistry { fun RPCDriverExposedDSLInterface.startReporter(): MetricRegistry {
val metricRegistry = MetricRegistry() val metricRegistry = MetricRegistry()
val jmxReporter = thread { val jmxReporter = thread {
JmxReporter. JmxReporter.
@ -307,9 +302,14 @@ fun RPCDriverExposedDSLInterface.startJmxReporter(): MetricRegistry {
build(). build().
start() start()
} }
val consoleReporter = thread {
ConsoleReporter.forRegistry(metricRegistry).build().start(1, TimeUnit.SECONDS)
}
shutdownManager.registerShutdown { shutdownManager.registerShutdown {
jmxReporter.interrupt() jmxReporter.interrupt()
consoleReporter.interrupt()
jmxReporter.join() jmxReporter.join()
consoleReporter.join()
} }
return metricRegistry return metricRegistry
} }

View File

@ -148,13 +148,19 @@ fun <T : Any> T.serialize(kryo: KryoPool = p2PKryo(), internalOnly: Boolean = fa
} }
private val serializeBufferPool = LazyPool { ByteArray(64 * 1024) } private val serializeBufferPool = LazyPool(
private val serializeOutputStreamPool = LazyPool(ByteArrayOutputStream::reset) { ByteArrayOutputStream(64 * 1024) } newInstance = { ByteArray(64 * 1024) }
)
private val serializeOutputStreamPool = LazyPool(
clear = ByteArrayOutputStream::reset,
shouldReturnToPool = { it.size() < 256 * 1024 }, // Discard if it grew too large
newInstance = { ByteArrayOutputStream(64 * 1024) }
)
fun <T : Any> T.serialize(kryo: Kryo, internalOnly: Boolean = false): SerializedBytes<T> { fun <T : Any> T.serialize(kryo: Kryo, internalOnly: Boolean = false): SerializedBytes<T> {
return serializeOutputStreamPool.run { stream -> return serializeOutputStreamPool.run { stream ->
serializeBufferPool.run { buffer -> serializeBufferPool.run { buffer ->
Output(buffer).use { Output(buffer).use {
it.setOutputStream(stream) it.outputStream = stream
it.writeBytes(KryoHeaderV0_1.bytes) it.writeBytes(KryoHeaderV0_1.bytes)
kryo.writeClassAndObject(it, this) kryo.writeClassAndObject(it, this)
} }

View File

@ -1,23 +1,28 @@
package net.corda.core.utilities package net.corda.core.utilities
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.Semaphore
/** /**
* A lazy pool of resources [A]. * A lazy pool of resources [A].
* *
* @param clear If specified this function will be run on each borrowed instance before handing it over. * @param clear If specified this function will be run on each borrowed instance before handing it over.
* @param shouldReturnToPool If specified this function will be run on each release to determine whether the instance
* should be returned to the pool for reuse. This may be useful for pooled resources that dynamically grow during
* usage, and we may not want to retain them forever.
* @param bound If specified the pool will be bounded. Once all instances are borrowed subsequent borrows will block until an * @param bound If specified the pool will be bounded. Once all instances are borrowed subsequent borrows will block until an
* instance is released. * instance is released.
* @param create The function to call to lazily create a pooled resource. * @param newInstance The function to call to lazily newInstance a pooled resource.
*/ */
class LazyPool<A>( class LazyPool<A>(
private val clear: ((A) -> Unit)? = null, private val clear: ((A) -> Unit)? = null,
private val shouldReturnToPool: ((A) -> Boolean)? = null,
private val bound: Int? = null, private val bound: Int? = null,
private val create: () -> A private val newInstance: () -> A
) { ) {
private val poolQueue = LinkedBlockingQueue<A>() private val poolQueue = ConcurrentLinkedQueue<A>()
private var poolSize = 0 private val poolSemaphore = Semaphore(bound ?: Int.MAX_VALUE)
private enum class State { private enum class State {
STARTED, STARTED,
@ -32,23 +37,10 @@ class LazyPool<A>(
fun borrow(): A { fun borrow(): A {
lifeCycle.requireState(State.STARTED) lifeCycle.requireState(State.STARTED)
poolSemaphore.acquire()
val pooled = poolQueue.poll() val pooled = poolQueue.poll()
if (pooled == null) { if (pooled == null) {
if (bound != null) { return newInstance()
val waitForRelease = synchronized(this) {
if (poolSize < bound) {
poolSize++
false
} else {
true
}
}
if (waitForRelease) {
// Wait until one is released
return clearIfNeeded(poolQueue.take())
}
}
return create()
} else { } else {
return clearIfNeeded(pooled) return clearIfNeeded(pooled)
} }
@ -56,8 +48,11 @@ class LazyPool<A>(
fun release(instance: A) { fun release(instance: A) {
lifeCycle.requireState(State.STARTED) lifeCycle.requireState(State.STARTED)
if (shouldReturnToPool == null || shouldReturnToPool.invoke(instance)) {
poolQueue.add(instance) poolQueue.add(instance)
} }
poolSemaphore.release()
}
/** /**
* Closes the pool. Note that all borrowed instances must have been released before calling this function, otherwise * Closes the pool. Note that all borrowed instances must have been released before calling this function, otherwise

View File

@ -6,10 +6,17 @@ import java.util.concurrent.LinkedBlockingQueue
/** /**
* A [LazyStickyPool] is a lazy pool of resources where a [borrow] may "stick" the borrowed instance to an object. * A [LazyStickyPool] is a lazy pool of resources where a [borrow] may "stick" the borrowed instance to an object.
* Any subsequent borrows using the same object will return the same pooled instance. * Any subsequent borrows using the same object will return the same pooled instance.
*
* @param size The size of the pool.
* @param shouldReturnToPool If specified this function will be run on each release to determine whether the instance
* should be returned to the pool for reuse. This may be useful for pooled resources that dynamically grow during
* usage, and we may not want to retain them forever.
* @param newInstance The function to call to create a pooled resource.
*/ */
// TODO This could be implemented more efficiently. Currently the "non-sticky" use case is not optimised, it just chooses a random instance to wait on. // TODO This could be implemented more efficiently. Currently the "non-sticky" use case is not optimised, it just chooses a random instance to wait on.
class LazyStickyPool<A : Any>( class LazyStickyPool<A : Any>(
size: Int, size: Int,
private val shouldReturnToPool: ((A) -> Boolean)? = null,
private val newInstance: () -> A private val newInstance: () -> A
) { ) {
private class InstanceBox<A> { private class InstanceBox<A> {
@ -45,7 +52,12 @@ class LazyStickyPool<A : Any>(
fun release(stickTo: Any, instance: A) { fun release(stickTo: Any, instance: A) {
val box = boxes[toIndex(stickTo)] val box = boxes[toIndex(stickTo)]
if (shouldReturnToPool == null || shouldReturnToPool.invoke(instance)) {
box.instance!!.add(instance) box.instance!!.add(instance)
} else {
// We need to create a new instance instead of setting the queue to null to unblock potentially waiting threads.
box.instance!!.add(newInstance())
}
} }
inline fun <R> run(stickToOrNull: Any? = null, withInstance: (A) -> R): R { inline fun <R> run(stickToOrNull: Any? = null, withInstance: (A) -> R): R {

View File

@ -0,0 +1,29 @@
package net.corda.core.utilities
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
/**
* [Rate] holds a quantity denoting the frequency of some event e.g. 100 times per second or 2 times per day.
*/
data class Rate(
val numberOfEvents: Long,
val perTimeUnit: TimeUnit
) {
/**
* Returns the interval between two subsequent events.
*/
fun toInterval(): Duration {
return Duration.of(TimeUnit.NANOSECONDS.convert(1, perTimeUnit) / numberOfEvents, ChronoUnit.NANOS)
}
/**
* Converts the number of events to the given unit.
*/
operator fun times(inUnit: TimeUnit): Long {
return inUnit.convert(numberOfEvents, perTimeUnit)
}
}
operator fun Long.div(timeUnit: TimeUnit) = Rate(this, timeUnit)