ENT-2414 Cache trace capture (#1337)

* Naive implementation of a trace capturing wrapper.

* Thread-safe cache tracing wrapper using a queue

* Use sipHash to get a long representing secure hash

* Code review rework

* Add copyright headers

* Move config back to enterprise config and remove trace name from CacheTracingConfig so it can be injected when the cache is created.
Add defaults to reference.conf
Add code and test to create directories when required.

* Remove empty line

* Sort out writer thread

* Blank line

* Revert: Code review rework (892911a)
This commit is contained in:
Christian Sailer 2018-08-17 17:28:28 +01:00 committed by GitHub
parent 047489ba7e
commit 9a3b1629a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 291 additions and 5 deletions

1
.idea/compiler.xml generated
View File

@ -267,6 +267,7 @@
<module name="shell_test" target="1.8" />
<module name="simm-valuation-demo_integrationTest" target="1.8" />
<module name="simm-valuation-demo_main" target="1.8" />
<module name="simm-valuation-demo_scenario" target="1.8" />
<module name="simm-valuation-demo_scenarioTest" target="1.8" />
<module name="simm-valuation-demo_test" target="1.8" />
<module name="smoke-test-utils_main" target="1.8" />

View File

@ -74,6 +74,7 @@ import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.JVMAgentRegistry
import net.corda.node.utilities.NamedThreadFactory
import net.corda.node.utilities.NodeBuildProperties
import net.corda.node.utilities.profiling.getTracingConfig
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.SignedNodeInfo
@ -906,7 +907,14 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
protected open fun makeVaultService(keyManagementService: KeyManagementService,
services: ServicesForResolution,
database: CordaPersistence): VaultServiceInternal {
return NodeVaultService(platformClock, keyManagementService, services, database, schemaService, configuration.transactionCacheSizeBytes)
return NodeVaultService(
platformClock,
keyManagementService,
services,
database,
schemaService,
configuration.transactionCacheSizeBytes,
configuration.enterpriseConfiguration.getTracingConfig())
}
/** Load configured JVM agents */

View File

@ -10,13 +10,18 @@
package net.corda.node.services.config
import java.io.File
import java.net.InetAddress
import java.nio.file.Path
data class EnterpriseConfiguration(
val mutualExclusionConfiguration: MutualExclusionConfiguration,
val useMultiThreadedSMM: Boolean = true,
val tuning: PerformanceTuning = PerformanceTuning.default,
val externalBridge: Boolean? = null)
val externalBridge: Boolean? = null,
val enableCacheTracing: Boolean = false,
val traceTargetDirectory: Path = File(".").toPath()
)
data class MutualExclusionConfiguration(val on: Boolean = false,
val machineName: String = defaultMachineName,

View File

@ -87,7 +87,6 @@ interface NodeConfiguration : NodeSSLConfiguration {
val flowMonitorSuspensionLoggingThresholdMillis: Duration get() = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS
val cordappDirectories: List<Path> get() = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT)
val jmxReporterType : JmxReporterType? get() = defaultJmxReporterType
fun validate(): List<String>
companion object {

View File

@ -34,6 +34,8 @@ import net.corda.node.services.config.KB
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.schema.PersistentStateService
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.utilities.profiling.CacheTracing.Companion.wrap
import net.corda.node.utilities.profiling.CacheTracingConfig
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.currentDBSession
@ -70,7 +72,8 @@ class NodeVaultService(
private val servicesForResolution: ServicesForResolution,
private val database: CordaPersistence,
private val schemaService: SchemaService,
transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize
transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize,
cacheTraceConfig: CacheTracingConfig? = null
) : SingletonSerializeAsToken(), VaultServiceInternal {
private companion object {
private val log = contextLogger()
@ -99,7 +102,14 @@ class NodeVaultService(
* This caches what states are in the vault for a particular transaction. Size the cache based on one entry per 8KB of transaction cache.
* This size results in minimum of 1024.
*/
private val producedStatesMapping = Caffeine.newBuilder().maximumSize(transactionCacheSizeBytes / 8.KB).build<SecureHash, BitSet>()
private val producedStatesMapping = wrap(
Caffeine.newBuilder().maximumSize(transactionCacheSizeBytes / 8.KB).build<SecureHash, BitSet>(),
converter = { key: SecureHash -> longHash.hashBytes(key.bytes).asLong() },
config = cacheTraceConfig,
traceName = "vaulteservice"
)
private val longHash = com.google.common.hash.Hashing.sipHash24()
override fun start() {
criteriaBuilder = database.hibernateConfig.sessionFactoryForRegisteredSchemas.criteriaBuilder

View File

@ -0,0 +1,20 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.node.utilities.profiling
import net.corda.node.services.config.EnterpriseConfiguration
import java.nio.file.Path
data class CacheTracingConfig(val enabled: Boolean, val targetDir: Path)
fun EnterpriseConfiguration.getTracingConfig(): CacheTracingConfig {
return CacheTracingConfig(this.enableCacheTracing, this.traceTargetDirectory)
}

View File

@ -0,0 +1,130 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.node.utilities.profiling
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.LoadingCache
import com.google.common.primitives.Longs
import net.corda.core.utilities.seconds
import net.corda.nodeapi.internal.addShutdownHook
import java.io.File
import java.io.FileOutputStream
import java.nio.file.Path
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.function.Function
import kotlin.concurrent.thread
class CacheTracing {
private class TraceCollector<K>(private val fileName: String, private val converter: (key: K) -> Long) {
@Volatile
private var running = true
private val queue = ConcurrentLinkedQueue<Long>()
private val writeThread = thread(start = true, name = "TraceWriter-$fileName", isDaemon = true) { writeFunc() }
fun collectKeys(keys: Iterable<K>) {
if (!running) {
return
}
keys.forEach { queue.add(converter(it)) }
}
fun shutdown() {
running = false
writeThread.join(10.seconds.toMillis())
}
private fun writeFunc() {
val file = File(fileName)
if (!file.parentFile.exists()) {
file.parentFile.mkdirs()
}
FileOutputStream(fileName, true).use {
var firstRun = true // make sure the loop runs at least once (in case of very short lived process where the thread might be started after shutdown is initiated.
while (running || firstRun) {
Thread.sleep(100) // sleep first, then check for work (so that work arriving during sleep does not get lost in shutdown)
var item: Long? = null
while ({ item = queue.poll(); item }() != null) {
it.write(Longs.toByteArray(item!!))
}
firstRun = false
}
}
}
}
private open class TracingCacheWrapper<K, V>(protected val cache: Cache<K, V>, protected val collector: TraceCollector<K>) : Cache<K, V> by cache {
override fun put(key: K, value: V) {
collector.collectKeys(listOf(key))
cache.put(key, value)
}
override fun putAll(map: MutableMap<out K, out V>) {
collector.collectKeys(map.keys)
cache.putAll(map)
}
override fun get(key: K, mappingFunction: Function<in K, out V>): V? {
collector.collectKeys(listOf(key))
return cache.get(key, mappingFunction)
}
override fun getIfPresent(key: Any): V? {
@Suppress("UNCHECKED_CAST") // need to suppress this warning - no way to check against an erased type
collector.collectKeys(listOf(key as K))
return cache.getIfPresent(key)
}
}
private class TracingLoadingCacheWrapper<K, V>(val loadingCache: LoadingCache<K, V>, collector: TraceCollector<K>) : LoadingCache<K, V>, TracingCacheWrapper<K, V>(loadingCache, collector) {
override fun getAll(keys: MutableIterable<K>): MutableMap<K, V> {
collector.collectKeys(keys)
return loadingCache.getAll(keys)
}
override fun refresh(key: K) {
collector.collectKeys(listOf(key))
loadingCache.refresh(key)
}
override fun get(key: K): V? {
collector.collectKeys(listOf(key))
return loadingCache.get(key)
}
}
companion object {
fun <K, V> wrap(cache: Cache<K, V>, converter: (key: K) -> Long, config: CacheTracingConfig?, traceName: String): Cache<K, V> {
return if (config != null && config.enabled) TracingCacheWrapper(cache, getCollector(config.targetDir, traceName, converter)) else cache
}
fun <K, V> wrap(cache: LoadingCache<K, V>, converter: (key: K) -> Long, config: CacheTracingConfig?, traceName: String): LoadingCache<K, V> {
return if (config != null && config.enabled) TracingLoadingCacheWrapper(cache, getCollector(config.targetDir, traceName, converter)) else cache
}
private val collectors = ConcurrentHashMap<String, TraceCollector<*>>()
@Synchronized
private fun <K> getCollector(targetDir: Path, traceName: String, converter: (key: K) -> Long): TraceCollector<K> {
if (collectors.isEmpty()) {
addShutdownHook { shutdown() }
}
val fileName = targetDir.resolve("trace_$traceName.bin").toAbsolutePath().toString()
@Suppress("UNCHECKED_CAST") // need to suppress this warning - no way to check against an erased type
return collectors.computeIfAbsent(fileName) { TraceCollector(it, converter) } as TraceCollector<K>
}
fun shutdown() {
collectors.values.forEach { it.shutdown() }
collectors.clear()
}
}
}

View File

@ -39,6 +39,8 @@ enterpriseConfiguration = {
brokerConnectionTtlCheckIntervalMs = 20
}
useMultiThreadedSMM = true
enableCacheTracing = false
traceTargetDirectory = ${baseDirectory}"/logs/traces"
}
rpcSettings = {
useSsl = false

View File

@ -0,0 +1,111 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.node.utilities.profiling
import com.github.benmanes.caffeine.cache.Caffeine
import com.google.common.primitives.Longs
import net.corda.node.utilities.profiling.CacheTracing.Companion.wrap
import org.junit.Test
import java.io.FileInputStream
import kotlin.test.assertEquals
class CacheTracingTest {
@Test
fun testEverythingGetsCaptured() {
val tempDir = createTempDir()
val cache = Caffeine.newBuilder().maximumSize(10).build<Long, Long>()
val wrappedCache = wrap(cache, { key: Long -> key }, CacheTracingConfig(true, tempDir.toPath()), "test")
wrappedCache.put(1L, 1L)
wrappedCache.putAll(mutableMapOf(2L to 2L, 3L to 3L))
wrappedCache.get(4) { it }
wrappedCache.getIfPresent(5)
CacheTracing.shutdown()
val fileName = tempDir.toPath().resolve("trace_test.bin")
val inStream = FileInputStream(fileName.toFile())
checkSequence(listOf(1L, 2L, 3L, 4L, 5L), inStream)
}
@Test
fun testEverythingGetsCapturedInDirectoryToBeCreated() {
val tempDir = createTempDir()
val cache = Caffeine.newBuilder().maximumSize(10).build<Long, Long>()
val wrappedCache = wrap(cache, { key: Long -> key }, CacheTracingConfig(true, tempDir.toPath().resolve("foo/bar")), "test")
wrappedCache.put(1L, 1L)
wrappedCache.putAll(mutableMapOf(2L to 2L, 3L to 3L))
wrappedCache.get(4) { it }
wrappedCache.getIfPresent(5)
CacheTracing.shutdown()
val fileName = tempDir.toPath().resolve("foo/bar/trace_test.bin")
val inStream = FileInputStream(fileName.toFile())
checkSequence(listOf(1L, 2L, 3L, 4L, 5L), inStream)
}
@Test
fun testStopsWorkingAfterShutdown() {
val tempDir = createTempDir()
val cache = Caffeine.newBuilder().maximumSize(10).build<Long, Long>()
val wrappedCache = wrap(cache, { key: Long -> key }, CacheTracingConfig(true, tempDir.toPath()), "test")
wrappedCache.put(1L, 1L)
CacheTracing.shutdown()
wrappedCache.putAll(mutableMapOf(2L to 2L, 3L to 3L))
CacheTracing.shutdown()
val fileName = tempDir.toPath().resolve("trace_test.bin")
val inStream = FileInputStream(fileName.toFile())
checkSequence(listOf(1L), inStream)
}
@Test
fun testEverythingGetsCapturedLoadingCache() {
val tempDir = createTempDir()
val cache = Caffeine.newBuilder().maximumSize(10).build<Long, Long> { it }
val wrappedCache = wrap(cache, { key: Long -> key }, CacheTracingConfig(true, tempDir.toPath()), "test")
wrappedCache.put(1L, 1L)
wrappedCache.putAll(mutableMapOf(2L to 2L, 3L to 3L))
wrappedCache.get(4)
wrappedCache.getIfPresent(5)
wrappedCache.getAll(listOf(1, 3))
wrappedCache.refresh(3)
CacheTracing.shutdown()
val fileName = tempDir.toPath().resolve("trace_test.bin")
val inStream = FileInputStream(fileName.toFile())
checkSequence(listOf(1L, 2L, 3L, 4L, 5L, 1L, 3L, 3L), inStream)
}
private fun checkSequence(expected: Iterable<Long>, stream: FileInputStream) {
val bytes = ByteArray(8)
expected.forEachIndexed { ind: Int, exp: Long ->
assertEquals(8, stream.read(bytes))
val actual = Longs.fromByteArray(bytes)
assertEquals(exp, actual, "Expected $exp, got $actual at positions $ind")
}
assertEquals(-1, stream.read(bytes))
}
}