mirror of
https://github.com/corda/corda.git
synced 2025-03-15 16:46:12 +00:00
ENT-2414 Cache trace capture (#1372)
* Refactor cache tracing so it is available in core (without pulling in any extra dependencies * Wrap all named caches in trace wrapper (if configured to create traces) * Remove special case for NodeVaultService and initialise CacheTracing config as soon as possible. * Keep checkCacheName internal * Revert back to use Google Longs class rather than hand-rolled code. * Add comment explaining unusual location of initialisation code. * Code review rework
This commit is contained in:
parent
dea7d0b905
commit
cc58a0c34a
@ -4,6 +4,7 @@ import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.CacheLoader
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache
|
||||
import net.corda.core.internal.profiling.CacheTracing.Companion.wrap
|
||||
|
||||
/**
|
||||
* Restrict the allowed characters of a cache name - this ensures that each cache has a name, and that
|
||||
@ -20,22 +21,20 @@ private val allowedChars = Regex("^[0-9A-Za-z_.]*\$")
|
||||
* This allows to easily add tweaks to all caches built in Corda, and also forces
|
||||
* cache users to give their cache a (meaningful) name that can be used e.g. for
|
||||
* capturing cache traces etc.
|
||||
*
|
||||
* Currently it is not used in this version of CORDA, but there are plans to do so.
|
||||
*/
|
||||
|
||||
fun <K, V> Caffeine<in K, in V>.buildNamed(name: String): Cache<K, V> {
|
||||
checkCacheName(name)
|
||||
return this.build<K, V>()
|
||||
return wrap(this.build<K, V>(), name)
|
||||
}
|
||||
|
||||
fun <K, V> Caffeine<in K, in V>.buildNamed(name: String, loadFunc: (K) -> V): LoadingCache<K, V> {
|
||||
checkCacheName(name)
|
||||
return this.build<K, V>(loadFunc)
|
||||
return wrap(this.build<K, V>(loadFunc), name)
|
||||
}
|
||||
|
||||
|
||||
fun <K, V> Caffeine<in K, in V>.buildNamed(name: String, loader: CacheLoader<K, V>): LoadingCache<K, V> {
|
||||
checkCacheName(name)
|
||||
return this.build<K, V>(loader)
|
||||
return wrap(this.build<K, V>(loader), name)
|
||||
}
|
||||
|
@ -1,12 +1,11 @@
|
||||
package net.corda.node.utilities.profiling
|
||||
package net.corda.core.internal.profiling
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache
|
||||
import com.google.common.primitives.Longs
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.nodeapi.internal.addShutdownHook
|
||||
import java.io.File
|
||||
import java.io.FileOutputStream
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.file.Path
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
@ -38,14 +37,16 @@ class CacheTracing {
|
||||
file.parentFile.mkdirs()
|
||||
}
|
||||
FileOutputStream(fileName, true).use {
|
||||
val buffer = ByteBuffer.allocate(java.lang.Long.BYTES)
|
||||
var firstRun = true // make sure the loop runs at least once (in case of very short lived process where the thread might be started after shutdown is initiated.
|
||||
while (running || firstRun) {
|
||||
Thread.sleep(100) // sleep first, then check for work (so that work arriving during sleep does not get lost in shutdown)
|
||||
var item: Long? = null
|
||||
while ({ item = queue.poll(); item }() != null) {
|
||||
it.write(Longs.toByteArray(item!!))
|
||||
buffer.putLong(item!!)
|
||||
it.write(buffer.array())
|
||||
buffer.clear()
|
||||
}
|
||||
firstRun = false
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -91,21 +92,35 @@ class CacheTracing {
|
||||
}
|
||||
}
|
||||
|
||||
data class CacheTracingConfig(val enabled: Boolean, val targetDir: Path, val converter: (key: Any?) -> Long)
|
||||
|
||||
companion object {
|
||||
fun <K, V> wrap(cache: Cache<K, V>, converter: (key: K) -> Long, config: CacheTracingConfig?, traceName: String): Cache<K, V> {
|
||||
return if (config != null && config.enabled) TracingCacheWrapper(cache, getCollector(config.targetDir, traceName, converter)) else cache
|
||||
var cacheTracingConfig: CacheTracingConfig? = null
|
||||
|
||||
fun <K, V> wrap(cache: Cache<K, V>, name: String): Cache<K, V> {
|
||||
return wrap(cache, cacheTracingConfig, name)
|
||||
}
|
||||
|
||||
fun <K, V> wrap(cache: LoadingCache<K, V>, converter: (key: K) -> Long, config: CacheTracingConfig?, traceName: String): LoadingCache<K, V> {
|
||||
return if (config != null && config.enabled) TracingLoadingCacheWrapper(cache, getCollector(config.targetDir, traceName, converter)) else cache
|
||||
fun <K, V> wrap(cache: LoadingCache<K, V>, name: String): LoadingCache<K, V> {
|
||||
return wrap(cache, cacheTracingConfig, name)
|
||||
}
|
||||
|
||||
fun <K, V> wrap(cache: Cache<K, V>, config: CacheTracingConfig?, traceName: String): Cache<K, V> {
|
||||
return if (config != null && config.enabled) TracingCacheWrapper(cache, getCollector(config.targetDir, traceName, config.converter)) else cache
|
||||
}
|
||||
|
||||
fun <K, V> wrap(cache: LoadingCache<K, V>, config: CacheTracingConfig?, traceName: String): LoadingCache<K, V> {
|
||||
return if (config != null && config.enabled) TracingLoadingCacheWrapper(cache, getCollector(config.targetDir, traceName, config.converter)) else cache
|
||||
}
|
||||
|
||||
private val collectors = ConcurrentHashMap<String, TraceCollector<*>>()
|
||||
|
||||
@Synchronized
|
||||
private fun <K> getCollector(targetDir: Path, traceName: String, converter: (key: K) -> Long): TraceCollector<K> {
|
||||
private fun <K> getCollector(targetDir: Path, traceName: String, converter: (key: Any?) -> Long): TraceCollector<K> {
|
||||
if (collectors.isEmpty()) {
|
||||
addShutdownHook { shutdown() }
|
||||
val hook = Thread { shutdown() }
|
||||
val runtime = Runtime.getRuntime()
|
||||
runtime.addShutdownHook(hook)
|
||||
}
|
||||
val fileName = targetDir.resolve("trace_$traceName.bin").toAbsolutePath().toString()
|
||||
@Suppress("UNCHECKED_CAST") // need to suppress this warning - no way to check against an erased type
|
@ -1,9 +1,8 @@
|
||||
package net.corda.node.utilities.profiling
|
||||
|
||||
package net.corda.core.internal.profiling
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.google.common.primitives.Longs
|
||||
import net.corda.node.utilities.profiling.CacheTracing.Companion.wrap
|
||||
import net.corda.core.internal.profiling.CacheTracing.Companion.wrap
|
||||
import org.junit.Test
|
||||
import java.io.FileInputStream
|
||||
import kotlin.test.assertEquals
|
||||
@ -14,12 +13,12 @@ class CacheTracingTest {
|
||||
val tempDir = createTempDir()
|
||||
val cache = Caffeine.newBuilder().maximumSize(10).build<Long, Long>()
|
||||
|
||||
val wrappedCache = wrap(cache, { key: Long -> key }, CacheTracingConfig(true, tempDir.toPath()), "test")
|
||||
val wrappedCache = wrap(cache, CacheTracing.CacheTracingConfig(true, tempDir.toPath(), { key: Any? -> key as Long }), "test")
|
||||
|
||||
wrappedCache.put(1L, 1L)
|
||||
wrappedCache.putAll(mutableMapOf(2L to 2L, 3L to 3L))
|
||||
wrappedCache.get(4) { it }
|
||||
wrappedCache.getIfPresent(5)
|
||||
wrappedCache.get(4L) { it }
|
||||
wrappedCache.getIfPresent(5L)
|
||||
|
||||
CacheTracing.shutdown()
|
||||
|
||||
@ -33,12 +32,12 @@ class CacheTracingTest {
|
||||
val tempDir = createTempDir()
|
||||
val cache = Caffeine.newBuilder().maximumSize(10).build<Long, Long>()
|
||||
|
||||
val wrappedCache = wrap(cache, { key: Long -> key }, CacheTracingConfig(true, tempDir.toPath().resolve("foo/bar")), "test")
|
||||
val wrappedCache = wrap(cache, CacheTracing.CacheTracingConfig(true, tempDir.toPath().resolve("foo/bar"), { key: Any? -> key as Long }), "test")
|
||||
|
||||
wrappedCache.put(1L, 1L)
|
||||
wrappedCache.putAll(mutableMapOf(2L to 2L, 3L to 3L))
|
||||
wrappedCache.get(4) { it }
|
||||
wrappedCache.getIfPresent(5)
|
||||
wrappedCache.get(4L) { it }
|
||||
wrappedCache.getIfPresent(5L)
|
||||
|
||||
CacheTracing.shutdown()
|
||||
|
||||
@ -53,7 +52,7 @@ class CacheTracingTest {
|
||||
val tempDir = createTempDir()
|
||||
val cache = Caffeine.newBuilder().maximumSize(10).build<Long, Long>()
|
||||
|
||||
val wrappedCache = wrap(cache, { key: Long -> key }, CacheTracingConfig(true, tempDir.toPath()), "test")
|
||||
val wrappedCache = wrap(cache, CacheTracing.CacheTracingConfig(true, tempDir.toPath(), { key: Any? -> key as Long }), "test")
|
||||
|
||||
wrappedCache.put(1L, 1L)
|
||||
CacheTracing.shutdown()
|
||||
@ -72,14 +71,14 @@ class CacheTracingTest {
|
||||
val tempDir = createTempDir()
|
||||
val cache = Caffeine.newBuilder().maximumSize(10).build<Long, Long> { it }
|
||||
|
||||
val wrappedCache = wrap(cache, { key: Long -> key }, CacheTracingConfig(true, tempDir.toPath()), "test")
|
||||
val wrappedCache = wrap(cache, CacheTracing.CacheTracingConfig(true, tempDir.toPath(), { key: Any? -> key as Long }), "test")
|
||||
|
||||
wrappedCache.put(1L, 1L)
|
||||
wrappedCache.putAll(mutableMapOf(2L to 2L, 3L to 3L))
|
||||
wrappedCache.get(4)
|
||||
wrappedCache.getIfPresent(5)
|
||||
wrappedCache.getAll(listOf(1, 3))
|
||||
wrappedCache.refresh(3)
|
||||
wrappedCache.get(4L)
|
||||
wrappedCache.getIfPresent(5L)
|
||||
wrappedCache.getAll(listOf(1L, 3L))
|
||||
wrappedCache.refresh(3L)
|
||||
|
||||
CacheTracing.shutdown()
|
||||
|
@ -69,7 +69,6 @@ import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.JVMAgentRegistry
|
||||
import net.corda.node.utilities.NamedThreadFactory
|
||||
import net.corda.node.utilities.NodeBuildProperties
|
||||
import net.corda.node.utilities.profiling.getTracingConfig
|
||||
import net.corda.nodeapi.internal.DevIdentityGenerator
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
@ -909,8 +908,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
services,
|
||||
database,
|
||||
schemaService,
|
||||
configuration.transactionCacheSizeBytes,
|
||||
configuration.enterpriseConfiguration.getTracingConfig())
|
||||
configuration.transactionCacheSizeBytes
|
||||
)
|
||||
}
|
||||
|
||||
/** Load configured JVM agents */
|
||||
|
@ -10,6 +10,7 @@ import net.corda.core.crypto.newSecureRandom
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.Emoji
|
||||
import net.corda.core.internal.concurrent.thenMatch
|
||||
import net.corda.core.internal.profiling.CacheTracing
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.VersionInfo
|
||||
@ -18,6 +19,7 @@ import net.corda.node.services.config.RelayConfiguration
|
||||
import net.corda.node.services.statemachine.MultiThreadedStateMachineExecutor
|
||||
import net.corda.node.services.statemachine.MultiThreadedStateMachineManager
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.utilities.profiling.getTracingConfig
|
||||
import org.fusesource.jansi.Ansi
|
||||
import org.fusesource.jansi.AnsiConsole
|
||||
import java.io.IOException
|
||||
@ -27,7 +29,9 @@ import java.util.concurrent.TimeUnit
|
||||
|
||||
open class EnterpriseNode(configuration: NodeConfiguration,
|
||||
versionInfo: VersionInfo,
|
||||
initialiseSerialization: Boolean = true
|
||||
initialiseSerialization: Boolean = true,
|
||||
// running this as a constructor default parameter as this is the first thing that will be run when instantiating the class
|
||||
initHelper: Unit = { CacheTracing.cacheTracingConfig = configuration.enterpriseConfiguration.getTracingConfig() }()
|
||||
) : Node(configuration, versionInfo, initialiseSerialization) {
|
||||
companion object {
|
||||
private val logger by lazy { loggerFor<EnterpriseNode>() }
|
||||
|
@ -24,8 +24,6 @@ import net.corda.node.services.config.KB
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.schema.PersistentStateService
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.utilities.profiling.CacheTracing.Companion.wrap
|
||||
import net.corda.node.utilities.profiling.CacheTracingConfig
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
@ -62,8 +60,7 @@ class NodeVaultService(
|
||||
private val servicesForResolution: ServicesForResolution,
|
||||
private val database: CordaPersistence,
|
||||
private val schemaService: SchemaService,
|
||||
transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize,
|
||||
cacheTraceConfig: CacheTracingConfig? = null
|
||||
transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize
|
||||
) : SingletonSerializeAsToken(), VaultServiceInternal {
|
||||
private companion object {
|
||||
private val log = contextLogger()
|
||||
@ -92,14 +89,7 @@ class NodeVaultService(
|
||||
* This caches what states are in the vault for a particular transaction. Size the cache based on one entry per 8KB of transaction cache.
|
||||
* This size results in minimum of 1024.
|
||||
*/
|
||||
private val producedStatesMapping = wrap(
|
||||
Caffeine.newBuilder().maximumSize(transactionCacheSizeBytes / 8.KB).build<SecureHash, BitSet>(),
|
||||
converter = { key: SecureHash -> longHash.hashBytes(key.bytes).asLong() },
|
||||
config = cacheTraceConfig,
|
||||
traceName = "vaulteservice"
|
||||
)
|
||||
|
||||
private val longHash = com.google.common.hash.Hashing.sipHash24()
|
||||
private val producedStatesMapping = Caffeine.newBuilder().maximumSize(transactionCacheSizeBytes / 8.KB).buildNamed<SecureHash, BitSet>("NodeVaultService_producedStates")
|
||||
|
||||
override fun start() {
|
||||
criteriaBuilder = database.hibernateConfig.sessionFactoryForRegisteredSchemas.criteriaBuilder
|
||||
|
@ -1,10 +1,19 @@
|
||||
package net.corda.node.utilities.profiling
|
||||
|
||||
import net.corda.core.internal.profiling.CacheTracing
|
||||
import net.corda.node.services.config.EnterpriseConfiguration
|
||||
import java.nio.file.Path
|
||||
import java.nio.charset.Charset
|
||||
|
||||
data class CacheTracingConfig(val enabled: Boolean, val targetDir: Path)
|
||||
val longHash = com.google.common.hash.Hashing.sipHash24()
|
||||
|
||||
fun EnterpriseConfiguration.getTracingConfig(): CacheTracingConfig {
|
||||
return CacheTracingConfig(this.enableCacheTracing, this.traceTargetDirectory)
|
||||
private fun convertObject(key: Any?): Long {
|
||||
if (key == null) {
|
||||
return 0
|
||||
}
|
||||
return longHash.hashString(key.toString(), Charset.defaultCharset()).asLong()
|
||||
}
|
||||
|
||||
|
||||
fun EnterpriseConfiguration.getTracingConfig(): CacheTracing.CacheTracingConfig {
|
||||
return CacheTracing.CacheTracingConfig(this.enableCacheTracing, this.traceTargetDirectory, { convertObject(it) })
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user