diff --git a/build.gradle b/build.gradle index 0d2d8ec9a4..1ee0c3a42a 100644 --- a/build.gradle +++ b/build.gradle @@ -46,6 +46,7 @@ buildscript { ext.log4j_version = '2.9.1' ext.bouncycastle_version = constants.getProperty("bouncycastleVersion") ext.guava_version = constants.getProperty("guavaVersion") + ext.caffeine_version = constants.getProperty("caffeineVersion") ext.okhttp_version = '3.5.0' ext.netty_version = '4.1.9.Final' ext.typesafe_config_version = constants.getProperty("typesafeConfigVersion") diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NetworkIdentityModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NetworkIdentityModel.kt index 5451f2acce..181fa895e6 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NetworkIdentityModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NetworkIdentityModel.kt @@ -1,7 +1,6 @@ package net.corda.client.jfx.model -import com.google.common.cache.CacheBuilder -import com.google.common.cache.CacheLoader +import com.github.benmanes.caffeine.cache.Caffeine import javafx.beans.value.ObservableValue import javafx.collections.FXCollections import javafx.collections.ObservableList @@ -32,8 +31,8 @@ class NetworkIdentityModel { private val rpcProxy by observableValue(NodeMonitorModel::proxyObservable) - private val identityCache = CacheBuilder.newBuilder() - .build>(CacheLoader.from { publicKey -> + private val identityCache = Caffeine.newBuilder() + .build>({ publicKey -> publicKey?.let { rpcProxy.map { it?.nodeInfoFromParty(AnonymousParty(publicKey)) } } }) val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.notaryIdentities() ?: emptyList()) }) @@ -42,5 +41,5 @@ class NetworkIdentityModel { .filtered { it.legalIdentities.all { it !in notaries } } val myIdentity = rpcProxy.map { it?.nodeInfo()?.legalIdentitiesAndCerts?.first()?.party } - fun partyFromPublicKey(publicKey: PublicKey): ObservableValue = identityCache[publicKey] + fun partyFromPublicKey(publicKey: PublicKey): ObservableValue = identityCache[publicKey]!! } diff --git a/client/rpc/build.gradle b/client/rpc/build.gradle index 3b585cf164..d60a6eb8d5 100644 --- a/client/rpc/build.gradle +++ b/client/rpc/build.gradle @@ -68,6 +68,9 @@ dependencies { compile project(':core') compile project(':node-api') + // For caches rather than guava + compile "com.github.ben-manes.caffeine:caffeine:$caffeine_version" + // Unit testing helpers. testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version" testCompile "junit:junit:$junit_version" diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index 79ad14f5dd..26d027e287 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -44,12 +44,6 @@ data class RPCClientConfiguration( val reapInterval: Duration, /** The number of threads to use for observations (for executing [Observable.onNext]) */ val observationExecutorPoolSize: Int, - /** - * Determines the concurrency level of the Observable Cache. This is exposed because it implicitly determines - * the limit on the number of leaked observables reaped because of garbage collection per reaping. - * See the implementation of [com.google.common.cache.LocalCache] for details. - */ - val cacheConcurrencyLevel: Int, /** The retry interval of artemis connections in milliseconds */ val connectionRetryInterval: Duration, /** The retry interval multiplier for exponential backoff */ @@ -71,7 +65,6 @@ data class RPCClientConfiguration( trackRpcCallSites = false, reapInterval = 1.seconds, observationExecutorPoolSize = 4, - cacheConcurrencyLevel = 8, connectionRetryInterval = 5.seconds, connectionRetryIntervalMultiplier = 1.5, connectionMaxRetryInterval = 3.minutes, diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 54b931ed8d..6964fe493d 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -1,13 +1,14 @@ package net.corda.client.rpc.internal +import co.paralleluniverse.common.util.SameThreadExecutor import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output -import com.google.common.cache.Cache -import com.google.common.cache.CacheBuilder -import com.google.common.cache.RemovalCause -import com.google.common.cache.RemovalListener +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.RemovalCause +import com.github.benmanes.caffeine.cache.RemovalListener import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.ThreadFactoryBuilder import net.corda.client.rpc.RPCException @@ -142,10 +143,10 @@ class RPCClientProxyHandler( private val serializationContextWithObservableContext = RpcClientObservableSerializer.createContext(serializationContext, observableContext) private fun createRpcObservableMap(): RpcObservableMap { - val onObservableRemove = RemovalListener>> { - val observableId = it.key!! + val onObservableRemove = RemovalListener>> { key, value, cause -> + val observableId = key!! val rpcCallSite = callSiteMap?.remove(observableId) - if (it.cause == RemovalCause.COLLECTED) { + if (cause == RemovalCause.COLLECTED) { log.warn(listOf( "A hot observable returned from an RPC was never subscribed to.", "This wastes server-side resources because it was queueing observations for retrieval.", @@ -156,10 +157,9 @@ class RPCClientProxyHandler( } observablesToReap.locked { observables.add(observableId) } } - return CacheBuilder.newBuilder(). + return Caffeine.newBuilder(). weakValues(). - removalListener(onObservableRemove). - concurrencyLevel(rpcConfiguration.cacheConcurrencyLevel). + removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()). build() } diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt index 5ea77a66fd..fe405b37af 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt @@ -91,8 +91,7 @@ class RPCConcurrencyTests : AbstractRPCTest() { return testProxy( TestOpsImpl(pool), clientConfiguration = RPCClientConfiguration.default.copy( - reapInterval = 100.millis, - cacheConcurrencyLevel = 16 + reapInterval = 100.millis ), serverConfiguration = RPCServerConfiguration.default.copy( rpcThreadPoolSize = 4 diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt index 689674d22f..0211d0b3ec 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt @@ -87,7 +87,6 @@ class RPCPerformanceTests : AbstractRPCTest() { rpcDriver { val proxy = testProxy( RPCClientConfiguration.default.copy( - cacheConcurrencyLevel = 16, observationExecutorPoolSize = 2 ), RPCServerConfiguration.default.copy( @@ -127,8 +126,7 @@ class RPCPerformanceTests : AbstractRPCTest() { val metricRegistry = startReporter(shutdownManager) val proxy = testProxy( RPCClientConfiguration.default.copy( - reapInterval = 1.seconds, - cacheConcurrencyLevel = 16 + reapInterval = 1.seconds ), RPCServerConfiguration.default.copy( rpcThreadPoolSize = 8 diff --git a/constants.properties b/constants.properties index 3f219493b2..134d656085 100644 --- a/constants.properties +++ b/constants.properties @@ -7,3 +7,4 @@ typesafeConfigVersion=1.3.1 jsr305Version=3.0.2 artifactoryPluginVersion=4.4.18 snakeYamlVersion=1.19 +caffeineVersion=2.6.2 diff --git a/node-api/build.gradle b/node-api/build.gradle index 6cc03bd117..405221b374 100644 --- a/node-api/build.gradle +++ b/node-api/build.gradle @@ -40,6 +40,9 @@ dependencies { // Pure-Java Snappy compression compile 'org.iq80.snappy:snappy:0.4' + // For caches rather than guava + compile "com.github.ben-manes.caffeine:caffeine:$caffeine_version" + // Unit testing helpers. testCompile "junit:junit:$junit_version" testCompile "org.assertj:assertj-core:$assertj_version" diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/DeduplicationChecker.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/DeduplicationChecker.kt index b35b8922a3..86742c6325 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/DeduplicationChecker.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/DeduplicationChecker.kt @@ -1,7 +1,7 @@ package net.corda.nodeapi.internal -import com.google.common.cache.CacheBuilder -import com.google.common.cache.CacheLoader +import com.github.benmanes.caffeine.cache.CacheLoader +import com.github.benmanes.caffeine.cache.Caffeine import java.time.Duration import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong @@ -11,11 +11,11 @@ import java.util.concurrent.atomic.AtomicLong */ class DeduplicationChecker(cacheExpiry: Duration) { // dedupe identity -> watermark cache - private val watermarkCache = CacheBuilder.newBuilder() + private val watermarkCache = Caffeine.newBuilder() .expireAfterAccess(cacheExpiry.toNanos(), TimeUnit.NANOSECONDS) .build(WatermarkCacheLoader) - private object WatermarkCacheLoader : CacheLoader() { + private object WatermarkCacheLoader : CacheLoader { override fun load(key: Any) = AtomicLong(-1) } @@ -25,6 +25,7 @@ class DeduplicationChecker(cacheExpiry: Duration) { * @return true if the message is unique, false if it's a duplicate. */ fun checkDuplicateMessageId(identity: Any, sequenceNumber: Long): Boolean { - return watermarkCache[identity].getAndUpdate { maxOf(sequenceNumber, it) } >= sequenceNumber + return watermarkCache[identity]!!.getAndUpdate { maxOf(sequenceNumber, it) } >= sequenceNumber } } + diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt index a093bc871b..369978bb62 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt @@ -1,7 +1,7 @@ package net.corda.nodeapi.internal.serialization -import com.google.common.cache.Cache -import com.google.common.cache.CacheBuilder +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.Caffeine import net.corda.core.contracts.Attachment import net.corda.core.crypto.SecureHash import net.corda.core.internal.copyBytes @@ -30,7 +30,7 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe override val useCase: SerializationContext.UseCase, override val encoding: SerializationEncoding?, override val encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist) : SerializationContext { - private val cache: Cache, AttachmentsClassLoader> = CacheBuilder.newBuilder().weakValues().maximumSize(1024).build() + private val cache: Cache, AttachmentsClassLoader> = Caffeine.newBuilder().weakValues().maximumSize(1024).build() /** * {@inheritDoc} @@ -49,7 +49,7 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe } missing.isNotEmpty() && throw MissingAttachmentsException(missing) AttachmentsClassLoader(attachments, parent = deserializationClassLoader) - }) + }!!) } catch (e: ExecutionException) { // Caught from within the cache get, so unwrap. throw e.cause!! diff --git a/node/build.gradle b/node/build.gradle index 74cad1b2ef..2158c98b2c 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -79,6 +79,9 @@ dependencies { compile "com.google.guava:guava:$guava_version" + // For caches rather than guava + compile "com.github.ben-manes.caffeine:caffeine:$caffeine_version" + // JOpt: for command line flags. compile "net.sf.jopt-simple:jopt-simple:$jopt_simple_version" diff --git a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt index 4b1daabdde..eec5d8ff33 100644 --- a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt @@ -1,7 +1,8 @@ package net.corda.node.internal.security -import com.google.common.cache.CacheBuilder -import com.google.common.cache.Cache + +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.Caffeine import com.google.common.primitives.Ints import net.corda.core.context.AuthServiceId import net.corda.core.utilities.loggerFor @@ -94,7 +95,7 @@ class RPCSecurityManagerImpl(config: AuthServiceConfig) : RPCSecurityManager { return DefaultSecurityManager(realm).also { // Setup optional cache layer if configured it.cacheManager = config.options?.cache?.let { - GuavaCacheManager( + CaffeineCacheManager( timeToLiveSeconds = it.expireAfterSecs, maxSize = it.maxEntries) } @@ -257,9 +258,9 @@ private class NodeJdbcRealm(config: SecurityConfiguration.AuthService.DataSource private typealias ShiroCache = org.apache.shiro.cache.Cache /* - * Adapts a [com.google.common.cache.Cache] to a [org.apache.shiro.cache.Cache] implementation. + * Adapts a [com.github.benmanes.caffeine.cache.Cache] to a [org.apache.shiro.cache.Cache] implementation. */ -private fun Cache.toShiroCache(name: String) = object : ShiroCache { +private fun Cache.toShiroCache(name: String) = object : ShiroCache { val name = name private val impl = this@toShiroCache @@ -282,7 +283,7 @@ private fun Cache.toShiroCache(name: String) = object : ShiroCache< impl.invalidateAll() } - override fun size() = Ints.checkedCast(impl.size()) + override fun size() = Ints.checkedCast(impl.estimatedSize()) override fun keys() = impl.asMap().keys override fun values() = impl.asMap().values override fun toString() = "Guava cache adapter [$impl]" @@ -290,22 +291,22 @@ private fun Cache.toShiroCache(name: String) = object : ShiroCache< /* * Implementation of [org.apache.shiro.cache.CacheManager] based on - * cache implementation in [com.google.common.cache] + * cache implementation in [com.github.benmanes.caffeine.cache.Cache] */ -private class GuavaCacheManager(val maxSize: Long, - val timeToLiveSeconds: Long) : CacheManager { +private class CaffeineCacheManager(val maxSize: Long, + val timeToLiveSeconds: Long) : CacheManager { private val instances = ConcurrentHashMap>() - override fun getCache(name: String): ShiroCache { + override fun getCache(name: String): ShiroCache { val result = instances[name] ?: buildCache(name) instances.putIfAbsent(name, result) return result as ShiroCache } - private fun buildCache(name: String) : ShiroCache { + private fun buildCache(name: String): ShiroCache { logger.info("Constructing cache '$name' with maximumSize=$maxSize, TTL=${timeToLiveSeconds}s") - return CacheBuilder.newBuilder() + return Caffeine.newBuilder() .expireAfterWrite(timeToLiveSeconds, TimeUnit.SECONDS) .maximumSize(maxSize) .build() @@ -313,6 +314,6 @@ private class GuavaCacheManager(val maxSize: Long, } companion object { - private val logger = loggerFor() + private val logger = loggerFor() } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index 657de9535e..e70f80d39d 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -1,12 +1,13 @@ package net.corda.node.services.messaging +import co.paralleluniverse.common.util.SameThreadExecutor import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output -import com.google.common.cache.Cache -import com.google.common.cache.CacheBuilder -import com.google.common.cache.RemovalListener +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.RemovalListener import com.google.common.collect.HashMultimap import com.google.common.collect.Multimaps import com.google.common.collect.SetMultimap @@ -145,11 +146,11 @@ class RPCServer( } private fun createObservableSubscriptionMap(): ObservableSubscriptionMap { - val onObservableRemove = RemovalListener { - log.debug { "Unsubscribing from Observable with id ${it.key} because of ${it.cause}" } - it.value.subscription.unsubscribe() + val onObservableRemove = RemovalListener { key, value, cause -> + log.debug { "Unsubscribing from Observable with id ${key} because of ${cause}" } + value!!.subscription.unsubscribe() } - return CacheBuilder.newBuilder().removalListener(onObservableRemove).build() + return Caffeine.newBuilder().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()).build() } fun start(activeMqServerControl: ActiveMQServerControl) { diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt index 19fb9fd875..caefa2d3d6 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt @@ -164,9 +164,9 @@ open class PersistentNetworkMapCache( override fun getNodesByLegalName(name: CordaX500Name): List = database.transaction { queryByLegalName(session, name) } - override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List = nodesByKeyCache[identityKey] + override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List = nodesByKeyCache[identityKey]!! - private val nodesByKeyCache = NonInvalidatingCache>(1024, 8, { key -> database.transaction { queryByIdentityKey(session, key) } }) + private val nodesByKeyCache = NonInvalidatingCache>(1024, { key -> database.transaction { queryByIdentityKey(session, key) } }) override fun getNodesByOwningKeyIndex(identityKeyIndex: String): List { return database.transaction { @@ -176,9 +176,9 @@ open class PersistentNetworkMapCache( override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = database.transaction { queryByAddress(session, address) } - override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = identityByLegalNameCache.get(name).orElse(null) + override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = identityByLegalNameCache.get(name)!!.orElse(null) - private val identityByLegalNameCache = NonInvalidatingCache>(1024, 8, { name -> Optional.ofNullable(database.transaction { queryIdentityByLegalName(session, name) }) }) + private val identityByLegalNameCache = NonInvalidatingCache>(1024, { name -> Optional.ofNullable(database.transaction { queryIdentityByLegalName(session, name) }) }) override fun track(): DataFeed, MapChange> { synchronized(_changed) { diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/NodeAttachmentService.kt b/node/src/main/kotlin/net/corda/node/services/persistence/NodeAttachmentService.kt index 9593cfc634..e4ea68a1e6 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/NodeAttachmentService.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/NodeAttachmentService.kt @@ -1,7 +1,7 @@ package net.corda.node.services.persistence import com.codahale.metrics.MetricRegistry -import com.google.common.cache.Weigher +import com.github.benmanes.caffeine.cache.Weigher import com.google.common.hash.HashCode import com.google.common.hash.Hashing import com.google.common.hash.HashingInputStream @@ -24,7 +24,6 @@ import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser import net.corda.node.utilities.NonInvalidatingCache import net.corda.node.utilities.NonInvalidatingWeightBasedCache -import net.corda.node.utilities.defaultCordaCacheConcurrencyLevel import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.currentDBSession import net.corda.nodeapi.internal.withContractsInJar @@ -209,7 +208,6 @@ class NodeAttachmentService( private val attachmentContentCache = NonInvalidatingWeightBasedCache>>( maxWeight = attachmentContentCacheSize, - concurrencyLevel = defaultCordaCacheConcurrencyLevel, weigher = object : Weigher>> { override fun weigh(key: SecureHash, value: Optional>): Int { return key.size + if (value.isPresent) value.get().second.size else 0 @@ -234,12 +232,11 @@ class NodeAttachmentService( private val attachmentCache = NonInvalidatingCache>( attachmentCacheBound, - defaultCordaCacheConcurrencyLevel, { key -> Optional.ofNullable(createAttachment(key)) } ) private fun createAttachment(key: SecureHash): Attachment? { - val content = attachmentContentCache.get(key) + val content = attachmentContentCache.get(key)!! if (content.isPresent) { return content.get().first } @@ -249,7 +246,7 @@ class NodeAttachmentService( } override fun openAttachment(id: SecureHash): Attachment? { - val attachment = attachmentCache.get(id) + val attachment = attachmentCache.get(id)!! if (attachment.isPresent) { return attachment.get() } diff --git a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt index c6b875dbd9..ad3df17747 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt @@ -1,7 +1,7 @@ package net.corda.node.utilities -import com.google.common.cache.LoadingCache -import com.google.common.cache.Weigher +import com.github.benmanes.caffeine.cache.LoadingCache +import com.github.benmanes.caffeine.cache.Weigher import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.persistence.currentDBSession import java.util.* @@ -29,7 +29,7 @@ abstract class AppendOnlyPersistentMapBase( * Returns the value associated with the key, first loading that value from the storage if necessary. */ operator fun get(key: K): V? { - return cache.get(key).orElse(null) + return cache.get(key)!!.orElse(null) } val size get() = allPersisted().toList().size @@ -62,7 +62,7 @@ abstract class AppendOnlyPersistentMapBase( } else { Optional.of(value) } - } + }!! if (!insertionAttempt) { if (existingInCache.isPresent) { // Key already exists in cache, do nothing. @@ -71,7 +71,7 @@ abstract class AppendOnlyPersistentMapBase( // This happens when the key was queried before with no value associated. We invalidate the cached null // value and recursively call set again. This is to avoid race conditions where another thread queries after // the invalidate but before the set. - cache.invalidate(key) + cache.invalidate(key!!) return set(key, value, logWarning, store) } } @@ -148,7 +148,6 @@ class AppendOnlyPersistentMap( //TODO determine cacheBound based on entity class later or with node config allowing tuning, or using some heuristic based on heap size override val cache = NonInvalidatingCache>( bound = cacheBound, - concurrencyLevel = 8, loadFunction = { key -> Optional.ofNullable(loadValue(key)) }) } @@ -166,7 +165,6 @@ class WeightBasedAppendOnlyPersistentMap( persistentEntityClass) { override val cache = NonInvalidatingWeightBasedCache>( maxWeight = maxWeight, - concurrencyLevel = 8, weigher = object : Weigher> { override fun weigh(key: K, value: Optional): Int { return weighingFunc(key, value) diff --git a/node/src/main/kotlin/net/corda/node/utilities/NonInvalidatingCache.kt b/node/src/main/kotlin/net/corda/node/utilities/NonInvalidatingCache.kt index f08a0631cb..d8840bacc2 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/NonInvalidatingCache.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/NonInvalidatingCache.kt @@ -1,29 +1,28 @@ package net.corda.node.utilities -import com.google.common.cache.CacheBuilder -import com.google.common.cache.CacheLoader -import com.google.common.cache.LoadingCache -import com.google.common.cache.Weigher -import com.google.common.util.concurrent.ListenableFuture +import com.github.benmanes.caffeine.cache.CacheLoader +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.LoadingCache +import com.github.benmanes.caffeine.cache.Weigher class NonInvalidatingCache private constructor( val cache: LoadingCache ) : LoadingCache by cache { - constructor(bound: Long, concurrencyLevel: Int, loadFunction: (K) -> V) : - this(buildCache(bound, concurrencyLevel, loadFunction)) + constructor(bound: Long, loadFunction: (K) -> V) : + this(buildCache(bound, loadFunction)) private companion object { - private fun buildCache(bound: Long, concurrencyLevel: Int, loadFunction: (K) -> V): LoadingCache { - val builder = CacheBuilder.newBuilder().maximumSize(bound).concurrencyLevel(concurrencyLevel) + private fun buildCache(bound: Long, loadFunction: (K) -> V): LoadingCache { + val builder = Caffeine.newBuilder().maximumSize(bound) return builder.build(NonInvalidatingCacheLoader(loadFunction)) } } // TODO look into overriding loadAll() if we ever use it - class NonInvalidatingCacheLoader(val loadFunction: (K) -> V) : CacheLoader() { - override fun reload(key: K, oldValue: V): ListenableFuture { + class NonInvalidatingCacheLoader(val loadFunction: (K) -> V) : CacheLoader { + override fun reload(key: K, oldValue: V): V { throw IllegalStateException("Non invalidating cache refreshed") } @@ -34,16 +33,14 @@ class NonInvalidatingCache private constructor( class NonInvalidatingWeightBasedCache private constructor( val cache: LoadingCache ) : LoadingCache by cache { - constructor (maxWeight: Long, concurrencyLevel: Int, weigher: Weigher, loadFunction: (K) -> V) : - this(buildCache(maxWeight, concurrencyLevel, weigher, loadFunction)) + constructor (maxWeight: Long, weigher: Weigher, loadFunction: (K) -> V) : + this(buildCache(maxWeight, weigher, loadFunction)) private companion object { - private fun buildCache(maxWeight: Long, concurrencyLevel: Int, weigher: Weigher, loadFunction: (K) -> V): LoadingCache { - val builder = CacheBuilder.newBuilder().maximumWeight(maxWeight).weigher(weigher).concurrencyLevel(concurrencyLevel) + private fun buildCache(maxWeight: Long, weigher: Weigher, loadFunction: (K) -> V): LoadingCache { + val builder = Caffeine.newBuilder().maximumWeight(maxWeight).weigher(weigher) return builder.build(NonInvalidatingCache.NonInvalidatingCacheLoader(loadFunction)) } } -} - -val defaultCordaCacheConcurrencyLevel: Int = 8 \ No newline at end of file +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/utilities/NonInvalidatingUnboundCache.kt b/node/src/main/kotlin/net/corda/node/utilities/NonInvalidatingUnboundCache.kt index ea16bfd064..b1fef1d2e6 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/NonInvalidatingUnboundCache.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/NonInvalidatingUnboundCache.kt @@ -1,21 +1,23 @@ package net.corda.node.utilities -import com.google.common.cache.* -import com.google.common.util.concurrent.ListenableFuture - +import co.paralleluniverse.common.util.SameThreadExecutor +import com.github.benmanes.caffeine.cache.CacheLoader +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.LoadingCache +import com.github.benmanes.caffeine.cache.RemovalListener class NonInvalidatingUnboundCache private constructor( val cache: LoadingCache ) : LoadingCache by cache { - constructor(concurrencyLevel: Int, loadFunction: (K) -> V, removalListener: RemovalListener = RemovalListener {}, + constructor(loadFunction: (K) -> V, removalListener: RemovalListener = RemovalListener { key, value, cause -> }, keysToPreload: () -> Iterable = { emptyList() }) : - this(buildCache(concurrencyLevel, loadFunction, removalListener, keysToPreload)) + this(buildCache(loadFunction, removalListener, keysToPreload)) private companion object { - private fun buildCache(concurrencyLevel: Int, loadFunction: (K) -> V, removalListener: RemovalListener, + private fun buildCache(loadFunction: (K) -> V, removalListener: RemovalListener, keysToPreload: () -> Iterable): LoadingCache { - val builder = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).removalListener(removalListener) + val builder = Caffeine.newBuilder().removalListener(removalListener).executor(SameThreadExecutor.getExecutor()) return builder.build(NonInvalidatingCacheLoader(loadFunction)).apply { getAll(keysToPreload()) } @@ -23,8 +25,8 @@ class NonInvalidatingUnboundCache private constructor( } // TODO look into overriding loadAll() if we ever use it - private class NonInvalidatingCacheLoader(val loadFunction: (K) -> V) : CacheLoader() { - override fun reload(key: K, oldValue: V): ListenableFuture { + private class NonInvalidatingCacheLoader(val loadFunction: (K) -> V) : CacheLoader { + override fun reload(key: K, oldValue: V): V { throw IllegalStateException("Non invalidating cache refreshed") } diff --git a/node/src/main/kotlin/net/corda/node/utilities/PersistentMap.kt b/node/src/main/kotlin/net/corda/node/utilities/PersistentMap.kt index c023d113fb..d205e2be15 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/PersistentMap.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/PersistentMap.kt @@ -1,8 +1,7 @@ package net.corda.node.utilities -import com.google.common.cache.RemovalCause -import com.google.common.cache.RemovalListener -import com.google.common.cache.RemovalNotification +import com.github.benmanes.caffeine.cache.RemovalCause +import com.github.benmanes.caffeine.cache.RemovalListener import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.persistence.currentDBSession import java.util.* @@ -10,7 +9,7 @@ import java.util.* /** * Implements an unbound caching layer on top of a table accessed via Hibernate mapping. */ -class PersistentMap( +class PersistentMap( val toPersistentEntityKey: (K) -> EK, val fromPersistentEntity: (E) -> Pair, val toPersistentEntity: (key: K, value: V) -> E, @@ -22,7 +21,6 @@ class PersistentMap( } private val cache = NonInvalidatingUnboundCache( - concurrencyLevel = 8, loadFunction = { key -> Optional.ofNullable(loadValue(key)) }, removalListener = ExplicitRemoval(toPersistentEntityKey, persistentEntityClass) ).apply { @@ -34,11 +32,11 @@ class PersistentMap( } class ExplicitRemoval(private val toPersistentEntityKey: (K) -> EK, private val persistentEntityClass: Class) : RemovalListener { - override fun onRemoval(notification: RemovalNotification?) { - when (notification?.cause) { + override fun onRemoval(key: K?, value: V?, cause: RemovalCause) { + when (cause) { RemovalCause.EXPLICIT -> { val session = currentDBSession() - val elem = session.find(persistentEntityClass, toPersistentEntityKey(notification.key)) + val elem = session.find(persistentEntityClass, toPersistentEntityKey(key!!)) if (elem != null) { session.remove(elem) } @@ -53,14 +51,14 @@ class PersistentMap( } override operator fun get(key: K): V? { - return cache.get(key).orElse(null) + return cache.get(key)!!.orElse(null) } fun all(): Sequence> { return cache.asMap().asSequence().filter { it.value.isPresent }.map { Pair(it.key, it.value.get()) } } - override val size get() = cache.size().toInt() + override val size get() = cache.estimatedSize().toInt() private tailrec fun set(key: K, value: V, logWarning: Boolean = true, store: (K, V) -> V?, replace: (K, V) -> Unit): Boolean { var insertionAttempt = false @@ -72,7 +70,7 @@ class PersistentMap( // Store the value, depending on store implementation this may replace existing entry in DB. store(key, value) Optional.of(value) - } + }!! if (!insertionAttempt) { if (existingInCache.isPresent) { // Key already exists in cache, store the new value in the DB (depends on tore implementation) and refresh cache. @@ -165,7 +163,7 @@ class PersistentMap( * Removes the mapping for the specified key from this map and underlying storage if present. */ override fun remove(key: K): V? { - val result = cache.get(key).orElse(null) + val result = cache.get(key)!!.orElse(null) cache.invalidate(key) return result } @@ -253,7 +251,7 @@ class PersistentMap( override fun put(key: K, value: V): V? { val old = cache.get(key) addWithDuplicatesReplaced(key, value) - return old.orElse(null) + return old!!.orElse(null) } fun load() { diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/identicon/IdenticonRenderer.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/identicon/IdenticonRenderer.kt index 74f6998f93..5ec79549e8 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/identicon/IdenticonRenderer.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/identicon/IdenticonRenderer.kt @@ -1,8 +1,8 @@ package net.corda.explorer.identicon +import com.github.benmanes.caffeine.cache.CacheLoader +import com.github.benmanes.caffeine.cache.Caffeine import com.google.common.base.Splitter -import com.google.common.cache.CacheBuilder -import com.google.common.cache.CacheLoader import javafx.scene.SnapshotParameters import javafx.scene.canvas.Canvas import javafx.scene.canvas.GraphicsContext @@ -75,7 +75,7 @@ object IdenticonRenderer { private val renderingSize = 30.0 - private val cache = CacheBuilder.newBuilder().build(CacheLoader.from { key -> + private val cache = Caffeine.newBuilder().build(CacheLoader { key -> key?.let { render(key.hashCode(), renderingSize) } }) @@ -92,7 +92,7 @@ object IdenticonRenderer { } fun getIdenticon(hash: SecureHash): Image { - return cache.get(hash) + return cache.get(hash)!! } /**