diff --git a/.idea/compiler.xml b/.idea/compiler.xml index 4042d5bda3..5a65eea1d8 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -267,6 +267,7 @@ + diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index b7c1de642d..f66053d054 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -74,6 +74,7 @@ import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.JVMAgentRegistry import net.corda.node.utilities.NamedThreadFactory import net.corda.node.utilities.NodeBuildProperties +import net.corda.node.utilities.profiling.getTracingConfig import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.NodeInfoAndSigned import net.corda.nodeapi.internal.SignedNodeInfo @@ -906,7 +907,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected open fun makeVaultService(keyManagementService: KeyManagementService, services: ServicesForResolution, database: CordaPersistence): VaultServiceInternal { - return NodeVaultService(platformClock, keyManagementService, services, database, schemaService, configuration.transactionCacheSizeBytes) + return NodeVaultService( + platformClock, + keyManagementService, + services, + database, + schemaService, + configuration.transactionCacheSizeBytes, + configuration.enterpriseConfiguration.getTracingConfig()) } /** Load configured JVM agents */ diff --git a/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt index c65c84465d..c56dd68a76 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt @@ -10,13 +10,18 @@ package net.corda.node.services.config +import java.io.File import java.net.InetAddress +import java.nio.file.Path data class EnterpriseConfiguration( val mutualExclusionConfiguration: MutualExclusionConfiguration, val useMultiThreadedSMM: Boolean = true, val tuning: PerformanceTuning = PerformanceTuning.default, - val externalBridge: Boolean? = null) + val externalBridge: Boolean? = null, + val enableCacheTracing: Boolean = false, + val traceTargetDirectory: Path = File(".").toPath() +) data class MutualExclusionConfiguration(val on: Boolean = false, val machineName: String = defaultMachineName, diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 41648e3488..3c761c7a82 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -87,7 +87,6 @@ interface NodeConfiguration : NodeSSLConfiguration { val flowMonitorSuspensionLoggingThresholdMillis: Duration get() = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS val cordappDirectories: List get() = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT) val jmxReporterType : JmxReporterType? get() = defaultJmxReporterType - fun validate(): List companion object { diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 15dd5f9ef9..c665e4c51c 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -34,6 +34,8 @@ import net.corda.node.services.config.KB import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.schema.PersistentStateService import net.corda.node.services.statemachine.FlowStateMachineImpl +import net.corda.node.utilities.profiling.CacheTracing.Companion.wrap +import net.corda.node.utilities.profiling.CacheTracingConfig import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit import net.corda.nodeapi.internal.persistence.currentDBSession @@ -70,7 +72,8 @@ class NodeVaultService( private val servicesForResolution: ServicesForResolution, private val database: CordaPersistence, private val schemaService: SchemaService, - transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize + transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize, + cacheTraceConfig: CacheTracingConfig? = null ) : SingletonSerializeAsToken(), VaultServiceInternal { private companion object { private val log = contextLogger() @@ -99,7 +102,14 @@ class NodeVaultService( * This caches what states are in the vault for a particular transaction. Size the cache based on one entry per 8KB of transaction cache. * This size results in minimum of 1024. */ - private val producedStatesMapping = Caffeine.newBuilder().maximumSize(transactionCacheSizeBytes / 8.KB).build() + private val producedStatesMapping = wrap( + Caffeine.newBuilder().maximumSize(transactionCacheSizeBytes / 8.KB).build(), + converter = { key: SecureHash -> longHash.hashBytes(key.bytes).asLong() }, + config = cacheTraceConfig, + traceName = "vaulteservice" + ) + + private val longHash = com.google.common.hash.Hashing.sipHash24() override fun start() { criteriaBuilder = database.hibernateConfig.sessionFactoryForRegisteredSchemas.criteriaBuilder diff --git a/node/src/main/kotlin/net/corda/node/utilities/profiling/CacheTracingConfig.kt b/node/src/main/kotlin/net/corda/node/utilities/profiling/CacheTracingConfig.kt new file mode 100644 index 0000000000..a73f851bf6 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/utilities/profiling/CacheTracingConfig.kt @@ -0,0 +1,20 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + +package net.corda.node.utilities.profiling + +import net.corda.node.services.config.EnterpriseConfiguration +import java.nio.file.Path + +data class CacheTracingConfig(val enabled: Boolean, val targetDir: Path) + +fun EnterpriseConfiguration.getTracingConfig(): CacheTracingConfig { + return CacheTracingConfig(this.enableCacheTracing, this.traceTargetDirectory) +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/utilities/profiling/TracingCacheWrapper.kt b/node/src/main/kotlin/net/corda/node/utilities/profiling/TracingCacheWrapper.kt new file mode 100644 index 0000000000..a10c86d74f --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/utilities/profiling/TracingCacheWrapper.kt @@ -0,0 +1,130 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + +package net.corda.node.utilities.profiling + +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.LoadingCache +import com.google.common.primitives.Longs +import net.corda.core.utilities.seconds +import net.corda.nodeapi.internal.addShutdownHook +import java.io.File +import java.io.FileOutputStream +import java.nio.file.Path +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.function.Function +import kotlin.concurrent.thread + +class CacheTracing { + private class TraceCollector(private val fileName: String, private val converter: (key: K) -> Long) { + @Volatile + private var running = true + private val queue = ConcurrentLinkedQueue() + private val writeThread = thread(start = true, name = "TraceWriter-$fileName", isDaemon = true) { writeFunc() } + + fun collectKeys(keys: Iterable) { + if (!running) { + return + } + keys.forEach { queue.add(converter(it)) } + } + + fun shutdown() { + running = false + writeThread.join(10.seconds.toMillis()) + } + + private fun writeFunc() { + val file = File(fileName) + if (!file.parentFile.exists()) { + file.parentFile.mkdirs() + } + FileOutputStream(fileName, true).use { + var firstRun = true // make sure the loop runs at least once (in case of very short lived process where the thread might be started after shutdown is initiated. + while (running || firstRun) { + Thread.sleep(100) // sleep first, then check for work (so that work arriving during sleep does not get lost in shutdown) + var item: Long? = null + while ({ item = queue.poll(); item }() != null) { + it.write(Longs.toByteArray(item!!)) + } + firstRun = false + } + } + } + } + + private open class TracingCacheWrapper(protected val cache: Cache, protected val collector: TraceCollector) : Cache by cache { + override fun put(key: K, value: V) { + collector.collectKeys(listOf(key)) + cache.put(key, value) + } + + override fun putAll(map: MutableMap) { + collector.collectKeys(map.keys) + cache.putAll(map) + } + + override fun get(key: K, mappingFunction: Function): V? { + collector.collectKeys(listOf(key)) + return cache.get(key, mappingFunction) + } + + override fun getIfPresent(key: Any): V? { + @Suppress("UNCHECKED_CAST") // need to suppress this warning - no way to check against an erased type + collector.collectKeys(listOf(key as K)) + return cache.getIfPresent(key) + } + } + + private class TracingLoadingCacheWrapper(val loadingCache: LoadingCache, collector: TraceCollector) : LoadingCache, TracingCacheWrapper(loadingCache, collector) { + override fun getAll(keys: MutableIterable): MutableMap { + collector.collectKeys(keys) + return loadingCache.getAll(keys) + } + + override fun refresh(key: K) { + collector.collectKeys(listOf(key)) + loadingCache.refresh(key) + } + + override fun get(key: K): V? { + collector.collectKeys(listOf(key)) + return loadingCache.get(key) + } + } + + companion object { + fun wrap(cache: Cache, converter: (key: K) -> Long, config: CacheTracingConfig?, traceName: String): Cache { + return if (config != null && config.enabled) TracingCacheWrapper(cache, getCollector(config.targetDir, traceName, converter)) else cache + } + + fun wrap(cache: LoadingCache, converter: (key: K) -> Long, config: CacheTracingConfig?, traceName: String): LoadingCache { + return if (config != null && config.enabled) TracingLoadingCacheWrapper(cache, getCollector(config.targetDir, traceName, converter)) else cache + } + + private val collectors = ConcurrentHashMap>() + + @Synchronized + private fun getCollector(targetDir: Path, traceName: String, converter: (key: K) -> Long): TraceCollector { + if (collectors.isEmpty()) { + addShutdownHook { shutdown() } + } + val fileName = targetDir.resolve("trace_$traceName.bin").toAbsolutePath().toString() + @Suppress("UNCHECKED_CAST") // need to suppress this warning - no way to check against an erased type + return collectors.computeIfAbsent(fileName) { TraceCollector(it, converter) } as TraceCollector + } + + fun shutdown() { + collectors.values.forEach { it.shutdown() } + collectors.clear() + } + } +} \ No newline at end of file diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index ea933ce79e..d58d2578df 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -39,6 +39,8 @@ enterpriseConfiguration = { brokerConnectionTtlCheckIntervalMs = 20 } useMultiThreadedSMM = true + enableCacheTracing = false + traceTargetDirectory = ${baseDirectory}"/logs/traces" } rpcSettings = { useSsl = false diff --git a/node/src/test/kotlin/net/corda/node/utilities/profiling/CacheTracingTest.kt b/node/src/test/kotlin/net/corda/node/utilities/profiling/CacheTracingTest.kt new file mode 100644 index 0000000000..1c5808f0d7 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/utilities/profiling/CacheTracingTest.kt @@ -0,0 +1,111 @@ +/* + * R3 Proprietary and Confidential + * + * Copyright (c) 2018 R3 Limited. All rights reserved. + * + * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. + * + * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. + */ + +package net.corda.node.utilities.profiling + + +import com.github.benmanes.caffeine.cache.Caffeine +import com.google.common.primitives.Longs +import net.corda.node.utilities.profiling.CacheTracing.Companion.wrap +import org.junit.Test +import java.io.FileInputStream +import kotlin.test.assertEquals + +class CacheTracingTest { + @Test + fun testEverythingGetsCaptured() { + val tempDir = createTempDir() + val cache = Caffeine.newBuilder().maximumSize(10).build() + + val wrappedCache = wrap(cache, { key: Long -> key }, CacheTracingConfig(true, tempDir.toPath()), "test") + + wrappedCache.put(1L, 1L) + wrappedCache.putAll(mutableMapOf(2L to 2L, 3L to 3L)) + wrappedCache.get(4) { it } + wrappedCache.getIfPresent(5) + + CacheTracing.shutdown() + + val fileName = tempDir.toPath().resolve("trace_test.bin") + val inStream = FileInputStream(fileName.toFile()) + checkSequence(listOf(1L, 2L, 3L, 4L, 5L), inStream) + } + + @Test + fun testEverythingGetsCapturedInDirectoryToBeCreated() { + val tempDir = createTempDir() + val cache = Caffeine.newBuilder().maximumSize(10).build() + + val wrappedCache = wrap(cache, { key: Long -> key }, CacheTracingConfig(true, tempDir.toPath().resolve("foo/bar")), "test") + + wrappedCache.put(1L, 1L) + wrappedCache.putAll(mutableMapOf(2L to 2L, 3L to 3L)) + wrappedCache.get(4) { it } + wrappedCache.getIfPresent(5) + + CacheTracing.shutdown() + + val fileName = tempDir.toPath().resolve("foo/bar/trace_test.bin") + val inStream = FileInputStream(fileName.toFile()) + checkSequence(listOf(1L, 2L, 3L, 4L, 5L), inStream) + } + + + @Test + fun testStopsWorkingAfterShutdown() { + val tempDir = createTempDir() + val cache = Caffeine.newBuilder().maximumSize(10).build() + + val wrappedCache = wrap(cache, { key: Long -> key }, CacheTracingConfig(true, tempDir.toPath()), "test") + + wrappedCache.put(1L, 1L) + CacheTracing.shutdown() + + wrappedCache.putAll(mutableMapOf(2L to 2L, 3L to 3L)) + CacheTracing.shutdown() + + val fileName = tempDir.toPath().resolve("trace_test.bin") + val inStream = FileInputStream(fileName.toFile()) + checkSequence(listOf(1L), inStream) + } + + + @Test + fun testEverythingGetsCapturedLoadingCache() { + val tempDir = createTempDir() + val cache = Caffeine.newBuilder().maximumSize(10).build { it } + + val wrappedCache = wrap(cache, { key: Long -> key }, CacheTracingConfig(true, tempDir.toPath()), "test") + + wrappedCache.put(1L, 1L) + wrappedCache.putAll(mutableMapOf(2L to 2L, 3L to 3L)) + wrappedCache.get(4) + wrappedCache.getIfPresent(5) + wrappedCache.getAll(listOf(1, 3)) + wrappedCache.refresh(3) + + CacheTracing.shutdown() + + val fileName = tempDir.toPath().resolve("trace_test.bin") + val inStream = FileInputStream(fileName.toFile()) + checkSequence(listOf(1L, 2L, 3L, 4L, 5L, 1L, 3L, 3L), inStream) + } + + private fun checkSequence(expected: Iterable, stream: FileInputStream) { + val bytes = ByteArray(8) + + expected.forEachIndexed { ind: Int, exp: Long -> + assertEquals(8, stream.read(bytes)) + val actual = Longs.fromByteArray(bytes) + assertEquals(exp, actual, "Expected $exp, got $actual at positions $ind") + } + assertEquals(-1, stream.read(bytes)) + } +} \ No newline at end of file