From 34517f653aee661cc4bcf554044f57d29c5eb0b9 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 4 May 2017 13:41:28 +0100 Subject: [PATCH] #592: Address more comments --- .../corda/client/rpc/internal/RPCClient.kt | 10 ++-- .../rpc/internal/RPCClientProxyHandler.kt | 4 +- .../corda/client/rpc/RPCConcurrencyTests.kt | 16 +----- .../corda/client/rpc/RPCPerformanceTests.kt | 50 +++++++++---------- .../net/corda/core/serialization/Kryo.kt | 12 +++-- .../net/corda/core/utilities/LazyPool.kt | 37 ++++++-------- .../corda/core/utilities/LazyStickyPool.kt | 14 +++++- .../kotlin/net/corda/core/utilities/Rate.kt | 29 +++++++++++ 8 files changed, 101 insertions(+), 71 deletions(-) create mode 100644 core/src/main/kotlin/net/corda/core/utilities/Rate.kt diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index 60c77c676e..60d50928bd 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -32,11 +32,11 @@ data class RPCClientConfiguration( */ val trackRpcCallSites: Boolean, /** - * The interval of unused observable reaping in milliseconds. Leaked Observables (unused ones) are - * detected using weak references and are cleaned up in batches in this interval. If set too large it will waste - * server side resources for this duration. If set too low it wastes client side cycles. + * The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references + * and are cleaned up in batches in this interval. If set too large it will waste server side resources for this + * duration. If set too low it wastes client side cycles. */ - val reapIntervalMs: Long, + val reapInterval: Duration, /** The number of threads to use for observations (for executing [Observable.onNext]) */ val observationExecutorPoolSize: Int, /** The maximum number of producers to create to handle outgoing messages */ @@ -61,7 +61,7 @@ data class RPCClientConfiguration( val default = RPCClientConfiguration( minimumServerProtocolVersion = 0, trackRpcCallSites = false, - reapIntervalMs = 1000, + reapInterval = 1.seconds, observationExecutorPoolSize = 4, producerPoolBound = 1, cacheConcurrencyLevel = 8, diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 6309af41f8..fb0e874b8a 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -165,8 +165,8 @@ class RPCClientProxyHandler( lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET) reaperScheduledFuture = reaperExecutor.scheduleAtFixedRate( this::reapObservables, - rpcConfiguration.reapIntervalMs, - rpcConfiguration.reapIntervalMs, + rpcConfiguration.reapInterval.toMillis(), + rpcConfiguration.reapInterval.toMillis(), TimeUnit.MILLISECONDS ) sessionAndProducerPool.run { diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt index 2e563bc40c..2ffe065832 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt @@ -1,34 +1,22 @@ package net.corda.client.rpc -import com.google.common.util.concurrent.Futures -import com.google.common.util.concurrent.ListenableFuture import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.core.future import net.corda.core.messaging.RPCOps +import net.corda.core.millis import net.corda.core.random63BitValue import net.corda.core.serialization.CordaSerializable -import net.corda.core.utilities.loggerFor -import net.corda.node.driver.poll import net.corda.node.services.messaging.RPCServerConfiguration -import net.corda.nodeapi.RPCApi import net.corda.testing.RPCDriverExposedDSLInterface import net.corda.testing.rpcDriver -import net.corda.testing.startRandomRpcClient -import net.corda.testing.startRpcClient -import org.apache.activemq.artemis.api.core.SimpleString -import org.junit.Ignore import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized import rx.Observable -import rx.subjects.PublishSubject import rx.subjects.UnicastSubject import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger @RunWith(Parameterized::class) class RPCConcurrencyTests : AbstractRPCTest() { @@ -98,7 +86,7 @@ class RPCConcurrencyTests : AbstractRPCTest() { return testProxy( testOpsImpl, clientConfiguration = RPCClientConfiguration.default.copy( - reapIntervalMs = 100, + reapInterval = 100.millis, cacheConcurrencyLevel = 16 ), serverConfiguration = RPCServerConfiguration.default.copy( diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt index fa804dd2c3..8d1fdcb65b 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt @@ -1,31 +1,25 @@ package net.corda.client.rpc +import com.codahale.metrics.ConsoleReporter import com.codahale.metrics.Gauge import com.codahale.metrics.JmxReporter import com.codahale.metrics.MetricRegistry -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.io.Input -import com.esotericsoftware.kryo.io.Output -import com.esotericsoftware.kryo.pool.KryoPool import com.google.common.base.Stopwatch import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.core.messaging.RPCOps -import net.corda.core.millis -import net.corda.core.random63BitValue +import net.corda.core.minutes +import net.corda.core.seconds +import net.corda.core.utilities.Rate +import net.corda.core.utilities.div import net.corda.node.driver.ShutdownManager import net.corda.node.services.messaging.RPCServerConfiguration -import net.corda.nodeapi.RPCApi -import net.corda.nodeapi.RPCKryo import net.corda.testing.RPCDriverExposedDSLInterface import net.corda.testing.measure import net.corda.testing.rpcDriver -import org.apache.activemq.artemis.api.core.SimpleString import org.junit.Ignore import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized -import rx.Observable import java.time.Duration import java.util.* import java.util.concurrent.* @@ -140,23 +134,24 @@ class RPCPerformanceTests : AbstractRPCTest() { @Test fun `consumption rate`() { rpcDriver { - val metricRegistry = startJmxReporter() + val metricRegistry = startReporter() val proxy = testProxy( RPCClientConfiguration.default.copy( - reapIntervalMs = 100, - cacheConcurrencyLevel = 16 + reapInterval = 1.seconds, + cacheConcurrencyLevel = 16, + producerPoolBound = 8 ), RPCServerConfiguration.default.copy( - rpcThreadPoolSize = 4, - consumerPoolSize = 4, - producerPoolBound = 4 + rpcThreadPoolSize = 8, + consumerPoolSize = 1, + producerPoolBound = 8 ) ) measurePerformancePublishMetrics( metricRegistry = metricRegistry, - parallelism = 4, - overallDurationSecond = 120.0, - injectionRatePerSecond = 20000.0, + parallelism = 8, + overallDuration = 5.minutes, + injectionRate = 20000L / TimeUnit.SECONDS, queueSizeMetricName = "$mode.QueueSize", workDurationMetricName = "$mode.WorkDuration", shutdownManager = this.shutdownManager, @@ -205,8 +200,8 @@ class RPCPerformanceTests : AbstractRPCTest() { fun measurePerformancePublishMetrics( metricRegistry: MetricRegistry, parallelism: Int, - overallDurationSecond: Double, - injectionRatePerSecond: Double, + overallDuration: Duration, + injectionRate: Rate, queueSizeMetricName: String, workDurationMetricName: String, shutdownManager: ShutdownManager, @@ -238,7 +233,7 @@ fun measurePerformancePublishMetrics( } val injector = executor.scheduleAtFixedRate( { - workSemaphore.release(injectionRatePerSecond.toInt()) + workSemaphore.release((injectionRate * TimeUnit.SECONDS).toInt()) }, 0, 1, @@ -251,7 +246,7 @@ fun measurePerformancePublishMetrics( workExecutor.awaitTermination(1, TimeUnit.SECONDS) executor.awaitTermination(1, TimeUnit.SECONDS) } - Thread.sleep((overallDurationSecond * 1000).toLong()) + Thread.sleep(overallDuration.toMillis()) } fun startInjectorWithBoundedQueue( @@ -289,7 +284,7 @@ fun startInjectorWithBoundedQueue( injector.join() } -fun RPCDriverExposedDSLInterface.startJmxReporter(): MetricRegistry { +fun RPCDriverExposedDSLInterface.startReporter(): MetricRegistry { val metricRegistry = MetricRegistry() val jmxReporter = thread { JmxReporter. @@ -307,9 +302,14 @@ fun RPCDriverExposedDSLInterface.startJmxReporter(): MetricRegistry { build(). start() } + val consoleReporter = thread { + ConsoleReporter.forRegistry(metricRegistry).build().start(1, TimeUnit.SECONDS) + } shutdownManager.registerShutdown { jmxReporter.interrupt() + consoleReporter.interrupt() jmxReporter.join() + consoleReporter.join() } return metricRegistry } diff --git a/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt b/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt index 95b82c118d..d744ea574d 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt @@ -148,13 +148,19 @@ fun T.serialize(kryo: KryoPool = p2PKryo(), internalOnly: Boolean = fa } -private val serializeBufferPool = LazyPool { ByteArray(64 * 1024) } -private val serializeOutputStreamPool = LazyPool(ByteArrayOutputStream::reset) { ByteArrayOutputStream(64 * 1024) } +private val serializeBufferPool = LazyPool( + newInstance = { ByteArray(64 * 1024) } +) +private val serializeOutputStreamPool = LazyPool( + clear = ByteArrayOutputStream::reset, + shouldReturnToPool = { it.size() < 256 * 1024 }, // Discard if it grew too large + newInstance = { ByteArrayOutputStream(64 * 1024) } +) fun T.serialize(kryo: Kryo, internalOnly: Boolean = false): SerializedBytes { return serializeOutputStreamPool.run { stream -> serializeBufferPool.run { buffer -> Output(buffer).use { - it.setOutputStream(stream) + it.outputStream = stream it.writeBytes(KryoHeaderV0_1.bytes) kryo.writeClassAndObject(it, this) } diff --git a/core/src/main/kotlin/net/corda/core/utilities/LazyPool.kt b/core/src/main/kotlin/net/corda/core/utilities/LazyPool.kt index a9df98637e..1a1abebdca 100644 --- a/core/src/main/kotlin/net/corda/core/utilities/LazyPool.kt +++ b/core/src/main/kotlin/net/corda/core/utilities/LazyPool.kt @@ -1,23 +1,28 @@ package net.corda.core.utilities +import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.Semaphore /** * A lazy pool of resources [A]. * * @param clear If specified this function will be run on each borrowed instance before handing it over. + * @param shouldReturnToPool If specified this function will be run on each release to determine whether the instance + * should be returned to the pool for reuse. This may be useful for pooled resources that dynamically grow during + * usage, and we may not want to retain them forever. * @param bound If specified the pool will be bounded. Once all instances are borrowed subsequent borrows will block until an * instance is released. - * @param create The function to call to lazily create a pooled resource. + * @param newInstance The function to call to lazily newInstance a pooled resource. */ class LazyPool( private val clear: ((A) -> Unit)? = null, + private val shouldReturnToPool: ((A) -> Boolean)? = null, private val bound: Int? = null, - private val create: () -> A + private val newInstance: () -> A ) { - private val poolQueue = LinkedBlockingQueue() - private var poolSize = 0 + private val poolQueue = ConcurrentLinkedQueue() + private val poolSemaphore = Semaphore(bound ?: Int.MAX_VALUE) private enum class State { STARTED, @@ -32,23 +37,10 @@ class LazyPool( fun borrow(): A { lifeCycle.requireState(State.STARTED) + poolSemaphore.acquire() val pooled = poolQueue.poll() if (pooled == null) { - if (bound != null) { - val waitForRelease = synchronized(this) { - if (poolSize < bound) { - poolSize++ - false - } else { - true - } - } - if (waitForRelease) { - // Wait until one is released - return clearIfNeeded(poolQueue.take()) - } - } - return create() + return newInstance() } else { return clearIfNeeded(pooled) } @@ -56,7 +48,10 @@ class LazyPool( fun release(instance: A) { lifeCycle.requireState(State.STARTED) - poolQueue.add(instance) + if (shouldReturnToPool == null || shouldReturnToPool.invoke(instance)) { + poolQueue.add(instance) + } + poolSemaphore.release() } /** diff --git a/core/src/main/kotlin/net/corda/core/utilities/LazyStickyPool.kt b/core/src/main/kotlin/net/corda/core/utilities/LazyStickyPool.kt index 298279f09f..cec52e1842 100644 --- a/core/src/main/kotlin/net/corda/core/utilities/LazyStickyPool.kt +++ b/core/src/main/kotlin/net/corda/core/utilities/LazyStickyPool.kt @@ -6,10 +6,17 @@ import java.util.concurrent.LinkedBlockingQueue /** * A [LazyStickyPool] is a lazy pool of resources where a [borrow] may "stick" the borrowed instance to an object. * Any subsequent borrows using the same object will return the same pooled instance. + * + * @param size The size of the pool. + * @param shouldReturnToPool If specified this function will be run on each release to determine whether the instance + * should be returned to the pool for reuse. This may be useful for pooled resources that dynamically grow during + * usage, and we may not want to retain them forever. + * @param newInstance The function to call to create a pooled resource. */ // TODO This could be implemented more efficiently. Currently the "non-sticky" use case is not optimised, it just chooses a random instance to wait on. class LazyStickyPool( size: Int, + private val shouldReturnToPool: ((A) -> Boolean)? = null, private val newInstance: () -> A ) { private class InstanceBox { @@ -45,7 +52,12 @@ class LazyStickyPool( fun release(stickTo: Any, instance: A) { val box = boxes[toIndex(stickTo)] - box.instance!!.add(instance) + if (shouldReturnToPool == null || shouldReturnToPool.invoke(instance)) { + box.instance!!.add(instance) + } else { + // We need to create a new instance instead of setting the queue to null to unblock potentially waiting threads. + box.instance!!.add(newInstance()) + } } inline fun run(stickToOrNull: Any? = null, withInstance: (A) -> R): R { diff --git a/core/src/main/kotlin/net/corda/core/utilities/Rate.kt b/core/src/main/kotlin/net/corda/core/utilities/Rate.kt new file mode 100644 index 0000000000..1936a27fa3 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/utilities/Rate.kt @@ -0,0 +1,29 @@ +package net.corda.core.utilities + +import java.time.Duration +import java.time.temporal.ChronoUnit +import java.util.concurrent.TimeUnit + +/** + * [Rate] holds a quantity denoting the frequency of some event e.g. 100 times per second or 2 times per day. + */ +data class Rate( + val numberOfEvents: Long, + val perTimeUnit: TimeUnit +) { + /** + * Returns the interval between two subsequent events. + */ + fun toInterval(): Duration { + return Duration.of(TimeUnit.NANOSECONDS.convert(1, perTimeUnit) / numberOfEvents, ChronoUnit.NANOS) + } + + /** + * Converts the number of events to the given unit. + */ + operator fun times(inUnit: TimeUnit): Long { + return inUnit.convert(numberOfEvents, perTimeUnit) + } +} + +operator fun Long.div(timeUnit: TimeUnit) = Rate(this, timeUnit)