mirror of
https://github.com/corda/corda.git
synced 2025-04-19 08:36:39 +00:00
ENT-2431 Add caching metrics, consolidate tracing config and better default cache sizes (#1418)
This commit is contained in:
parent
532d95ccac
commit
8bbc0d9f43
@ -4,7 +4,6 @@ import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.CacheLoader
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache
|
||||
import net.corda.core.internal.profiling.CacheTracing.Companion.wrap
|
||||
|
||||
/**
|
||||
* Restrict the allowed characters of a cache name - this ensures that each cache has a name, and that
|
||||
@ -21,14 +20,16 @@ private val allowedChars = Regex("^[0-9A-Za-z_.]*\$")
|
||||
* This allows to easily add tweaks to all caches built in Corda, and also forces
|
||||
* cache users to give their cache a (meaningful) name that can be used e.g. for
|
||||
* capturing cache traces etc.
|
||||
*
|
||||
* Currently it is not used in this version of CORDA, but there are plans to do so.
|
||||
*/
|
||||
|
||||
fun <K, V> Caffeine<in K, in V>.buildNamed(name: String): Cache<K, V> {
|
||||
checkCacheName(name)
|
||||
return wrap(this.build<K, V>(), name)
|
||||
return this.build<K, V>()
|
||||
}
|
||||
|
||||
fun <K, V> Caffeine<in K, in V>.buildNamed(name: String, loader: CacheLoader<K, V>): LoadingCache<K, V> {
|
||||
checkCacheName(name)
|
||||
return wrap(this.build<K, V>(loader), name)
|
||||
return this.build<K, V>(loader)
|
||||
}
|
||||
|
@ -95,16 +95,6 @@ class CacheTracing {
|
||||
data class CacheTracingConfig(val enabled: Boolean, val targetDir: Path, val converter: (key: Any?) -> Long)
|
||||
|
||||
companion object {
|
||||
var cacheTracingConfig: CacheTracingConfig? = null
|
||||
|
||||
fun <K, V> wrap(cache: Cache<K, V>, name: String): Cache<K, V> {
|
||||
return wrap(cache, cacheTracingConfig, name)
|
||||
}
|
||||
|
||||
fun <K, V> wrap(cache: LoadingCache<K, V>, name: String): LoadingCache<K, V> {
|
||||
return wrap(cache, cacheTracingConfig, name)
|
||||
}
|
||||
|
||||
fun <K, V> wrap(cache: Cache<K, V>, config: CacheTracingConfig?, traceName: String): Cache<K, V> {
|
||||
return if (config != null && config.enabled) TracingCacheWrapper(cache, getCollector(config.targetDir, traceName, config.converter)) else cache
|
||||
}
|
||||
|
@ -62,7 +62,8 @@ import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.DefaultNamedCacheFactory
|
||||
import net.corda.node.utilities.EnterpriseNamedCacheFactory
|
||||
import net.corda.node.utilities.profiling.getTracingConfig
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.isH2Database
|
||||
@ -118,7 +119,7 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri
|
||||
}
|
||||
|
||||
private val metricRegistry = MetricRegistry()
|
||||
override val cacheFactory = DefaultNamedCacheFactory().bindWithConfig(configuration).bindWithMetrics(metricRegistry).tokenize()
|
||||
override val cacheFactory = EnterpriseNamedCacheFactory(configuration.enterpriseConfiguration.getTracingConfig()).bindWithConfig(configuration).bindWithMetrics(metricRegistry).tokenize()
|
||||
|
||||
override val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, false).tokenize()
|
||||
override val identityService = PersistentIdentityService(cacheFactory).tokenize()
|
||||
@ -144,7 +145,7 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri
|
||||
override val keyManagementService = PersistentKeyManagementService(cacheFactory, identityService, database).tokenize()
|
||||
private val servicesForResolution = ServicesForResolutionImpl(identityService, attachments, cordappProvider, validatedTransactions)
|
||||
@Suppress("LeakingThis")
|
||||
override val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService).tokenize()
|
||||
override val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService, cacheFactory).tokenize()
|
||||
override val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
|
||||
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
|
||||
override val monitoringService = MonitoringService(metricRegistry).tokenize()
|
||||
|
@ -47,7 +47,8 @@ import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.node.services.persistence.NodePropertiesPersistentStore
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.utilities.DefaultNamedCacheFactory
|
||||
import net.corda.node.utilities.EnterpriseNamedCacheFactory
|
||||
import net.corda.node.utilities.profiling.getTracingConfig
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.isH2Database
|
||||
@ -76,7 +77,7 @@ class RpcWorkerServiceHub(override val configuration: NodeConfiguration, overrid
|
||||
private val runOnStop = ArrayList<() -> Any?>()
|
||||
|
||||
private val metricRegistry = MetricRegistry()
|
||||
override val cacheFactory = DefaultNamedCacheFactory().bindWithConfig(configuration).bindWithMetrics(metricRegistry)
|
||||
override val cacheFactory = EnterpriseNamedCacheFactory(configuration.enterpriseConfiguration.getTracingConfig()).bindWithConfig(configuration).bindWithMetrics(metricRegistry)
|
||||
|
||||
override val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, false)
|
||||
override val identityService = PersistentIdentityService(cacheFactory)
|
||||
@ -104,7 +105,7 @@ class RpcWorkerServiceHub(override val configuration: NodeConfiguration, overrid
|
||||
override val keyManagementService = PersistentKeyManagementService(cacheFactory, identityService, database)
|
||||
private val servicesForResolution = ServicesForResolutionImpl(identityService, attachments, cordappProvider, validatedTransactions)
|
||||
@Suppress("LeakingThis")
|
||||
override val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService)
|
||||
override val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService, cacheFactory)
|
||||
override val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
|
||||
override val monitoringService = MonitoringService(metricRegistry)
|
||||
override val networkMapUpdater = NetworkMapUpdater(
|
||||
|
@ -943,7 +943,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
services,
|
||||
database,
|
||||
schemaService,
|
||||
configuration.transactionCacheSizeBytes
|
||||
cacheFactory
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -10,7 +10,6 @@ import net.corda.core.crypto.newSecureRandom
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.Emoji
|
||||
import net.corda.core.internal.concurrent.thenMatch
|
||||
import net.corda.core.internal.profiling.CacheTracing
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.VersionInfo
|
||||
@ -19,6 +18,7 @@ import net.corda.node.services.config.RelayConfiguration
|
||||
import net.corda.node.services.statemachine.MultiThreadedStateMachineExecutor
|
||||
import net.corda.node.services.statemachine.MultiThreadedStateMachineManager
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.utilities.EnterpriseNamedCacheFactory
|
||||
import net.corda.node.utilities.profiling.getTracingConfig
|
||||
import org.fusesource.jansi.Ansi
|
||||
import org.fusesource.jansi.AnsiConsole
|
||||
@ -29,10 +29,8 @@ import java.util.concurrent.TimeUnit
|
||||
|
||||
open class EnterpriseNode(configuration: NodeConfiguration,
|
||||
versionInfo: VersionInfo,
|
||||
initialiseSerialization: Boolean = true,
|
||||
// running this as a constructor default parameter as this is the first thing that will be run when instantiating the class
|
||||
initHelper: Unit = { CacheTracing.cacheTracingConfig = configuration.enterpriseConfiguration.getTracingConfig() }()
|
||||
) : Node(configuration, versionInfo, initialiseSerialization) {
|
||||
initialiseSerialization: Boolean = true
|
||||
) : Node(configuration, versionInfo, initialiseSerialization, cacheFactoryPrototype = EnterpriseNamedCacheFactory(configuration.enterpriseConfiguration.getTracingConfig())) {
|
||||
companion object {
|
||||
private val logger by lazy { loggerFor<EnterpriseNode>() }
|
||||
|
||||
|
@ -10,7 +10,6 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.TimeUnit
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Id
|
||||
@ -29,7 +28,7 @@ class P2PMessageDeduplicator(cacheFactory: NamedCacheFactory, private val databa
|
||||
private val processedMessages = createProcessedMessages(cacheFactory)
|
||||
// We add the peer to the key, so other peers cannot attempt malicious meddling with sequence numbers.
|
||||
// Expire after 7 days since we last touched an entry, to avoid infinite growth.
|
||||
private val senderUUIDSeqNoHWM: MutableMap<SenderKey, SenderHashToSeqNo> = Caffeine.newBuilder().expireAfterAccess(7, TimeUnit.DAYS).build<SenderKey, SenderHashToSeqNo>().asMap()
|
||||
private val senderUUIDSeqNoHWM: MutableMap<SenderKey, SenderHashToSeqNo> = cacheFactory.buildNamed<SenderKey, SenderHashToSeqNo>(Caffeine.newBuilder(), "P2PMessageDeduplicator_senderUUIDSeqNoHWM").asMap()
|
||||
|
||||
private fun createProcessedMessages(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap<DeduplicationId, MessageMeta, ProcessedMessage, String> {
|
||||
return AppendOnlyPersistentMap(
|
||||
|
@ -21,10 +21,9 @@ import net.corda.core.transactions.*
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.api.VaultServiceInternal
|
||||
import net.corda.node.services.config.KB
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.schema.PersistentStateService
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.utilities.NamedCacheFactory
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
@ -61,7 +60,7 @@ class NodeVaultService(
|
||||
private val servicesForResolution: ServicesForResolution,
|
||||
private val database: CordaPersistence,
|
||||
private val schemaService: SchemaService,
|
||||
transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize
|
||||
cacheFactory: NamedCacheFactory
|
||||
) : SingletonSerializeAsToken(), VaultServiceInternal {
|
||||
private companion object {
|
||||
private val log = contextLogger()
|
||||
@ -87,10 +86,9 @@ class NodeVaultService(
|
||||
private val contractStateTypeMappings = mutableMapOf<String, MutableSet<String>>()
|
||||
|
||||
/**
|
||||
* This caches what states are in the vault for a particular transaction. Size the cache based on one entry per 8KB of transaction cache.
|
||||
* This size results in minimum of 1024.
|
||||
* This caches what states are in the vault for a particular transaction.
|
||||
*/
|
||||
private val producedStatesMapping = Caffeine.newBuilder().maximumSize(transactionCacheSizeBytes / 8.KB).buildNamed<SecureHash, BitSet>("NodeVaultService_producedStates")
|
||||
private val producedStatesMapping = cacheFactory.buildNamed<SecureHash, BitSet>(Caffeine.newBuilder(), "NodeVaultService_producedStates")
|
||||
|
||||
override fun start() {
|
||||
criteriaBuilder = database.hibernateConfig.sessionFactoryForRegisteredSchemas.criteriaBuilder
|
||||
|
@ -0,0 +1,152 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import com.codahale.metrics.Counter
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.codahale.metrics.Timer
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.CacheLoader
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache
|
||||
import com.github.benmanes.caffeine.cache.stats.CacheStats
|
||||
import com.github.benmanes.caffeine.cache.stats.StatsCounter
|
||||
import net.corda.core.internal.buildNamed
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.function.Supplier
|
||||
|
||||
/**
|
||||
* Helper to export statistics to JMX and finally build the cache.
|
||||
*/
|
||||
fun <K, V> Caffeine<in K, in V>.buildNamed(registry: MetricRegistry, metricsPrefix: String): Cache<K, V> = this.addMetrics(registry, metricsPrefix).buildNamed<K, V>(metricsPrefix).addExtraMetrics(registry, metricsPrefix)
|
||||
|
||||
fun <K, V> Caffeine<in K, in V>.buildNamed(registry: MetricRegistry, metricsPrefix: String, loader: CacheLoader<K, V>): LoadingCache<K, V> = this.recordStats(CaffeineStatsCounter.supplier(registry, "Caches/$metricsPrefix")).buildNamed<K, V>(metricsPrefix, loader).addExtraMetrics(registry, metricsPrefix)
|
||||
|
||||
private fun <K, V> Caffeine<in K, in V>.addMetrics(registry: MetricRegistry, metricsPrefix: String): Caffeine<in K, in V> = this.recordStats(CaffeineStatsCounter.supplier(registry, "Caches/$metricsPrefix"))
|
||||
private fun <K, V, C : Cache<K, V>> C.addExtraMetrics(registry: MetricRegistry, metricsPrefix: String): C = this.apply {
|
||||
registry.gauge("Caches/$metricsPrefix.size") {
|
||||
object : Gauge<Long> {
|
||||
override fun getValue(): Long {
|
||||
return this@apply.estimatedSize()
|
||||
}
|
||||
}
|
||||
}
|
||||
this@apply.policy().eviction().ifPresent {
|
||||
if (it.isWeighted) {
|
||||
registry.gauge("Caches/$metricsPrefix.maximum-weight") {
|
||||
object : Gauge<Long> {
|
||||
override fun getValue(): Long {
|
||||
return it.maximum
|
||||
}
|
||||
}
|
||||
}
|
||||
registry.gauge("Caches/$metricsPrefix.weight") {
|
||||
object : Gauge<Long> {
|
||||
override fun getValue(): Long {
|
||||
return it.weightedSize().asLong
|
||||
}
|
||||
}
|
||||
}
|
||||
registry.gauge("Caches/$metricsPrefix.weightPercent") {
|
||||
object : Gauge<Long> {
|
||||
override fun getValue(): Long {
|
||||
return minOf((it.weightedSize().asLong * 100L) / it.maximum, 100L)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
registry.gauge("Caches/$metricsPrefix.maximum-size") {
|
||||
object : Gauge<Long> {
|
||||
override fun getValue(): Long {
|
||||
return it.maximum
|
||||
}
|
||||
}
|
||||
}
|
||||
registry.gauge("Caches/$metricsPrefix.sizePercent") {
|
||||
object : Gauge<Long> {
|
||||
override fun getValue(): Long {
|
||||
return minOf((this@apply.estimatedSize() * 100L) / it.maximum, 100L)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A {@link StatsCounter} backed by Dropwizard Metrics.
|
||||
*/
|
||||
class CaffeineStatsCounter : StatsCounter {
|
||||
private val hitCount: Counter
|
||||
private val missCount: Counter
|
||||
private val loadSuccessCount: Counter
|
||||
private val loadFailureCount: Counter
|
||||
private val totalLoadTime: Timer
|
||||
private val evictionCount: Counter
|
||||
private val evictionWeight: Counter
|
||||
|
||||
companion object {
|
||||
/**
|
||||
* Creates a supplier of instances for use by a single cache.
|
||||
*
|
||||
* @param registry the registry of metric instances
|
||||
* @param metricsPrefix the prefix name for the metrics
|
||||
*/
|
||||
fun supplier(registry: MetricRegistry, metricsPrefix: String): Supplier<StatsCounter> {
|
||||
return Supplier<StatsCounter> { CaffeineStatsCounter(registry, metricsPrefix) }
|
||||
}
|
||||
}
|
||||
|
||||
private constructor(registry: MetricRegistry, metricsPrefix: String) {
|
||||
hitCount = registry.counter("$metricsPrefix.hits")
|
||||
missCount = registry.counter("$metricsPrefix.misses")
|
||||
totalLoadTime = registry.timer("$metricsPrefix.loads")
|
||||
loadSuccessCount = registry.counter("$metricsPrefix.loads-success")
|
||||
loadFailureCount = registry.counter("$metricsPrefix.loads-failure")
|
||||
evictionCount = registry.counter("$metricsPrefix.evictions")
|
||||
evictionWeight = registry.counter("$metricsPrefix.evictions-weight")
|
||||
}
|
||||
|
||||
override fun recordHits(count: Int) {
|
||||
hitCount.inc(count.toLong())
|
||||
}
|
||||
|
||||
override fun recordMisses(count: Int) {
|
||||
missCount.inc(count.toLong())
|
||||
}
|
||||
|
||||
override fun recordLoadSuccess(loadTime: Long) {
|
||||
loadSuccessCount.inc()
|
||||
totalLoadTime.update(loadTime, TimeUnit.NANOSECONDS)
|
||||
}
|
||||
|
||||
override fun recordLoadFailure(loadTime: Long) {
|
||||
loadFailureCount.inc()
|
||||
totalLoadTime.update(loadTime, TimeUnit.NANOSECONDS)
|
||||
}
|
||||
|
||||
override fun recordEviction() {
|
||||
// This method is scheduled for removal in version 3.0 in favor of recordEviction(weight)
|
||||
recordEviction(1)
|
||||
}
|
||||
|
||||
override fun recordEviction(weight: Int) {
|
||||
evictionCount.inc()
|
||||
evictionWeight.inc(weight.toLong())
|
||||
}
|
||||
|
||||
override fun snapshot(): CacheStats {
|
||||
return CacheStats(
|
||||
hitCount.getCount(),
|
||||
missCount.getCount(),
|
||||
loadSuccessCount.getCount(),
|
||||
loadFailureCount.getCount(),
|
||||
totalLoadTime.getCount(),
|
||||
evictionCount.getCount(),
|
||||
evictionWeight.getCount())
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
return snapshot().toString()
|
||||
}
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.CacheLoader
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache
|
||||
import net.corda.core.internal.profiling.CacheTracing
|
||||
import net.corda.core.internal.profiling.CacheTracing.Companion.wrap
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.node.services.config.KB
|
||||
import net.corda.node.services.config.MB
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
|
||||
class EnterpriseNamedCacheFactory private constructor(private val tracingConfig: CacheTracing.CacheTracingConfig, private val metricRegistry: MetricRegistry?, private val nodeConfiguration: NodeConfiguration?) : NamedCacheFactory, SingletonSerializeAsToken() {
|
||||
constructor(tracingConfig: CacheTracing.CacheTracingConfig) : this(tracingConfig, null, null)
|
||||
|
||||
override fun bindWithMetrics(metricRegistry: MetricRegistry): NamedCacheFactory = EnterpriseNamedCacheFactory(tracingConfig, metricRegistry, this.nodeConfiguration)
|
||||
override fun bindWithConfig(nodeConfiguration: NodeConfiguration): NamedCacheFactory = EnterpriseNamedCacheFactory(tracingConfig, this.metricRegistry, nodeConfiguration)
|
||||
|
||||
// Scale most caches off the transaction cache size.
|
||||
private fun defaultBound(nodeConfiguration: NodeConfiguration): Long = nodeConfiguration.transactionCacheSizeBytes / 8.KB
|
||||
|
||||
// This result in the minimum being 10MB as per OS, but it then grows as per the transaction cache.
|
||||
private fun defaultAttachmentCacheBound(nodeConfiguration: NodeConfiguration): Long = nodeConfiguration.transactionCacheSizeBytes + 2.MB
|
||||
|
||||
// This results in a minium of 1024 entries as per OS, but then grows linearly with attachment cache size.
|
||||
private fun defaultAttachmentPresenceCacheBound(nodeConfiguration: NodeConfiguration): Long = defaultAttachmentCacheBound(nodeConfiguration) / 10.KB
|
||||
|
||||
override fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String): Cache<K, V> {
|
||||
checkNotNull(metricRegistry)
|
||||
checkNotNull(nodeConfiguration)
|
||||
return wrap(caffeine.maximumSize(defaultBound(nodeConfiguration!!)).buildNamed<K, V>(metricRegistry!!, name), tracingConfig, name)
|
||||
}
|
||||
|
||||
// TODO: allow a config file override for any named cache.
|
||||
override fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String, loader: CacheLoader<K, V>): LoadingCache<K, V> {
|
||||
checkNotNull(metricRegistry)
|
||||
checkNotNull(nodeConfiguration)
|
||||
val configuredCaffeine = when (name) {
|
||||
"DBTransactionStorage_transactions" -> caffeine.maximumWeight(nodeConfiguration!!.transactionCacheSizeBytes)
|
||||
"NodeAttachmentService_attachmentContent" -> caffeine.maximumWeight(defaultAttachmentCacheBound(nodeConfiguration!!))
|
||||
"NodeAttachmentService_attachmentPresence" -> caffeine.maximumSize(defaultAttachmentPresenceCacheBound(nodeConfiguration!!))
|
||||
"P2PMessageDeduplicator_senderUUIDSeqNoHWM" -> caffeine.expireAfterAccess(7, TimeUnit.DAYS)
|
||||
else -> caffeine.maximumSize(defaultBound(nodeConfiguration!!))
|
||||
}
|
||||
return wrap(configuredCaffeine.buildNamed<K, V>(metricRegistry!!, name, loader), tracingConfig, name)
|
||||
}
|
||||
}
|
@ -37,6 +37,7 @@ import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.schema.PersistentStateService
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.services.vault.VaultSchemaV1
|
||||
import net.corda.node.utilities.TestingNamedCacheFactory
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
|
||||
@ -121,7 +122,7 @@ class HibernateConfigurationTest {
|
||||
services = object : MockServices(cordappPackages, BOB_NAME, rigorousMock<IdentityServiceInternal>().also {
|
||||
doNothing().whenever(it).justVerifyAndRegisterIdentity(argThat { name == BOB_NAME }, any())
|
||||
}, generateKeyPair(), dummyNotary.keyPair) {
|
||||
override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, database, schemaService).apply { start() }
|
||||
override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, database, schemaService, TestingNamedCacheFactory()).apply { start() }
|
||||
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
|
||||
for (stx in txs) {
|
||||
(validatedTransactions as WritableTransactionStorage).addTransaction(stx)
|
||||
|
@ -249,7 +249,7 @@ open class MockServices private constructor(
|
||||
}
|
||||
|
||||
internal fun makeVaultService(schemaService: SchemaService, database: CordaPersistence): VaultServiceInternal {
|
||||
return NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService).apply { start() }
|
||||
return NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService, EnterpriseMockNamedCachedFactory()).apply { start() }
|
||||
}
|
||||
|
||||
// This needs to be internal as MutableClassToInstanceMap is a guava type and shouldn't be part of our public API
|
||||
|
@ -0,0 +1,36 @@
|
||||
package net.corda.testing.node.internal
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.CacheLoader
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache
|
||||
import net.corda.core.internal.buildNamed
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.node.services.config.MB
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.utilities.NamedCacheFactory
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class EnterpriseMockNamedCachedFactory(private val sizeOverride: Long, private val metricRegistry: MetricRegistry?, private val nodeConfiguration: NodeConfiguration?) : NamedCacheFactory, SingletonSerializeAsToken() {
|
||||
constructor(sizeOverride: Long = 1024) : this(sizeOverride, null, null)
|
||||
|
||||
override fun bindWithMetrics(metricRegistry: MetricRegistry): NamedCacheFactory = EnterpriseMockNamedCachedFactory(sizeOverride, metricRegistry, this.nodeConfiguration)
|
||||
override fun bindWithConfig(nodeConfiguration: NodeConfiguration): NamedCacheFactory = EnterpriseMockNamedCachedFactory(sizeOverride, this.metricRegistry, nodeConfiguration)
|
||||
|
||||
override fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String): Cache<K, V> {
|
||||
// Does not check metricRegistry or nodeConfiguration, because for tests we don't care.
|
||||
return caffeine.maximumSize(sizeOverride).buildNamed<K, V>(name)
|
||||
}
|
||||
|
||||
override fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String, loader: CacheLoader<K, V>): LoadingCache<K, V> {
|
||||
// Does not check metricRegistry or nodeConfiguration, because for tests we don't care.
|
||||
val configuredCaffeine = when (name) {
|
||||
"DBTransactionStorage_transactions" -> caffeine.maximumWeight(1.MB)
|
||||
"NodeAttachmentService_attachmentContent" -> caffeine.maximumWeight(1.MB)
|
||||
"P2PMessageDeduplicator_senderUUIDSeqNoHWM" -> caffeine.expireAfterAccess(1, TimeUnit.HOURS)
|
||||
else -> caffeine.maximumSize(sizeOverride)
|
||||
}
|
||||
return configuredCaffeine.buildNamed<K, V>(name, loader)
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user