From 8d5611853a7a3ecf587b15bb7b7f76bb7c417090 Mon Sep 17 00:00:00 2001 From: Christian Sailer Date: Mon, 22 Jan 2018 13:41:06 +0000 Subject: [PATCH] CORDA-929 Attachment caching (#2372) * ENT-1403 Cache node attachments (and attachment content) * ENT-1403 Make cache sizes configurable * Update documentation with new config parameters * Test that non-existence of attachments is not cached * Remove unneeded defaults in interface * It turned out we need the defaults on the interface in quite a few tests * Codereview: typos, size in MB rather than bytes, charset in tests, move concurrencyLevel to a constant * Codereview: Make the internal config value bytes again, but config file in MB * Fix example config unit test --- docs/source/corda-configuration-file.rst | 11 ++- .../net/corda/docs/ExampleConfigTest.kt | 9 ++- .../net/corda/node/internal/AbstractNode.kt | 7 +- .../node/services/config/NodeConfiguration.kt | 16 +++- .../persistence/NodeAttachmentService.kt | 75 ++++++++++++++++++- .../node/utilities/NonInvalidatingCache.kt | 4 +- .../persistence/NodeAttachmentStorageTest.kt | 48 ++++++++++-- 7 files changed, 151 insertions(+), 19 deletions(-) diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index 79ce625b0d..1a81da1e3f 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -170,4 +170,13 @@ path to the node's base directory. Default Jolokia access url is http://127.0.0.1:7005/jolokia/ :useAMQPBridges: Optionally can be set to ``false`` to use Artemis CORE Bridges for peer-to-peer communications. - Otherwise, defaults to ``true`` and the AMQP 1.0 protocol will be used for message transfer between nodes. \ No newline at end of file + Otherwise, defaults to ``true`` and the AMQP 1.0 protocol will be used for message transfer between nodes. + +:transactionCacheSizeMegaBytes: Optionally specify how much memory should be used for caching of ledger transactions in memory. + Otherwise defaults to 8MB plus 5% of all heap memory above 300MB. + +:attachmentContentCacheSizeMegaBytes: Optionally specify how much memory should be used to cache attachment contents in memory. + Otherwise defaults to 10MB + +:attachmentCacheBound: Optionally specify how many attachments should be cached locally. Note that this includes only the key and + metadata, the content is cached separately and can be loaded lazily. Defaults to 1024. \ No newline at end of file diff --git a/docs/source/example-code/src/test/kotlin/net/corda/docs/ExampleConfigTest.kt b/docs/source/example-code/src/test/kotlin/net/corda/docs/ExampleConfigTest.kt index edefef85f7..bf5cb49b1d 100644 --- a/docs/source/example-code/src/test/kotlin/net/corda/docs/ExampleConfigTest.kt +++ b/docs/source/example-code/src/test/kotlin/net/corda/docs/ExampleConfigTest.kt @@ -1,12 +1,15 @@ package net.corda.docs import net.corda.node.services.config.ConfigHelper +import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.parseAsNodeConfiguration import net.corda.verifier.Verifier import org.junit.Test import java.nio.file.Path import java.nio.file.Paths +import kotlin.reflect.KVisibility import kotlin.reflect.full.declaredMemberProperties +import kotlin.reflect.jvm.isAccessible class ExampleConfigTest { @@ -17,14 +20,16 @@ class ExampleConfigTest { val config = loadConfig(Paths.get(configFileResource.toURI())) // Force the config fields as they are resolved lazily config.javaClass.kotlin.declaredMemberProperties.forEach { member -> - member.get(config) + if (member.visibility == KVisibility.PUBLIC) { + member.get(config) + } } } } @Test fun `example node_confs parses fine`() { - readAndCheckConfigurations( + readAndCheckConfigurations( "example-node.conf", "example-out-of-process-verifier-node.conf", "example-network-map-node.conf" diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 6280ed522a..5affcadaf8 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -36,10 +36,7 @@ import net.corda.node.services.ContractUpgradeHandler import net.corda.node.services.FinalityHandler import net.corda.node.services.NotaryChangeHandler import net.corda.node.services.api.* -import net.corda.node.services.config.BFTSMaRtConfiguration -import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.config.NotaryConfig -import net.corda.node.services.config.configureWithDevSSLCertificate +import net.corda.node.services.config.* import net.corda.node.services.events.NodeSchedulerService import net.corda.node.services.events.ScheduledActivityObserver import net.corda.node.services.identity.PersistentIdentityService @@ -536,7 +533,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, private fun makeServices(keyPairs: Set, schemaService: SchemaService, transactionStorage: WritableTransactionStorage, database: CordaPersistence, info: NodeInfo, identityService: IdentityServiceInternal, networkMapCache: NetworkMapCacheInternal): MutableList { checkpointStorage = DBCheckpointStorage() val metrics = MetricRegistry() - attachments = NodeAttachmentService(metrics) + attachments = NodeAttachmentService(metrics, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound) val cordappProvider = CordappProviderImpl(cordappLoader, attachments) val keyManagementService = makeKeyManagementService(identityService, keyPairs) _services = ServiceHubInternalImpl( diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 61a7da64ab..0868682faa 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -14,6 +14,7 @@ import java.net.URL import java.nio.file.Path import java.util.* + val Int.MB: Long get() = this * 1024L * 1024L interface NodeConfiguration : NodeSSLConfiguration { @@ -42,6 +43,9 @@ interface NodeConfiguration : NodeSSLConfiguration { val database: DatabaseConfig val useAMQPBridges: Boolean get() = true val transactionCacheSizeBytes: Long get() = defaultTransactionCacheSize + val attachmentContentCacheSizeBytes: Long get() = defaultAttachmentContentCacheSize + val attachmentCacheBound: Long get() = defaultAttachmentCacheBound + companion object { // default to at least 8MB and a bit extra for larger heap sizes @@ -51,6 +55,9 @@ interface NodeConfiguration : NodeSSLConfiguration { private fun getAdditionalCacheMemory(): Long { return Math.max((Runtime.getRuntime().maxMemory() - 300.MB) / 20, 0) } + + val defaultAttachmentContentCacheSize: Long = 10.MB + val defaultAttachmentCacheBound = 1024L } } @@ -127,10 +134,17 @@ data class NodeConfigurationImpl( override val sshd: SSHDConfiguration? = null, override val database: DatabaseConfig = DatabaseConfig(initialiseSchema = devMode, exportHibernateJMXStatistics = devMode), override val useAMQPBridges: Boolean = true, - override val transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize + private val transactionCacheSizeMegaBytes: Int? = null, + private val attachmentContentCacheSizeMegaBytes: Int? = null, + override val attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound ) : NodeConfiguration { override val exportJMXto: String get() = "http" + override val transactionCacheSizeBytes: Long + get() = transactionCacheSizeMegaBytes?.MB ?: super.transactionCacheSizeBytes + override val attachmentContentCacheSizeBytes: Long + get() = attachmentContentCacheSizeMegaBytes?.MB ?: super.attachmentContentCacheSizeBytes + init { // This is a sanity feature do not remove. diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/NodeAttachmentService.kt b/node/src/main/kotlin/net/corda/node/services/persistence/NodeAttachmentService.kt index 60c5eb373a..784499509d 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/NodeAttachmentService.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/NodeAttachmentService.kt @@ -1,6 +1,7 @@ package net.corda.node.services.persistence import com.codahale.metrics.MetricRegistry +import com.google.common.cache.Weigher import com.google.common.hash.HashCode import com.google.common.hash.Hashing import com.google.common.hash.HashingInputStream @@ -16,12 +17,17 @@ import net.corda.core.node.services.vault.AttachmentQueryCriteria import net.corda.core.node.services.vault.AttachmentSort import net.corda.core.serialization.* import net.corda.core.utilities.contextLogger +import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser +import net.corda.node.utilities.NonInvalidatingCache +import net.corda.node.utilities.NonInvalidatingWeightBasedCache +import net.corda.node.utilities.defaultCordaCacheConcurrencyLevel import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.currentDBSession import java.io.* import java.nio.file.Paths import java.time.Instant +import java.util.* import java.util.jar.JarInputStream import javax.annotation.concurrent.ThreadSafe import javax.persistence.* @@ -30,7 +36,12 @@ import javax.persistence.* * Stores attachments using Hibernate to database. */ @ThreadSafe -class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, SingletonSerializeAsToken() { +class NodeAttachmentService( + metrics: MetricRegistry, + attachmentContentCacheSize: Long = NodeConfiguration.defaultAttachmentContentCacheSize, + attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound +) : AttachmentStorage, SingletonSerializeAsToken( +) { companion object { private val log = contextLogger() @@ -172,11 +183,67 @@ class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, Single } - override fun openAttachment(id: SecureHash): Attachment? { + + // slightly complex 2 level approach to attachment caching: + // On the first level we cache attachment contents loaded from the DB by their key. This is a weight based + // cache (we don't want to waste too much memory on this) and could be evicted quite aggressively. If we fail + // to load an attachment from the db, the loader will insert a non present optional - we invalidate this + // immediately as we definitely want to retry whether the attachment was just delayed. + // On the second level, we cache Attachment implementations that use the first cache to load their content + // when required. As these are fairly small, we can cache quite a lot of them, this will make checking + // repeatedly whether an attachment exists fairly cheap. Here as well, we evict non-existent entries immediately + // to force a recheck if required. + // If repeatedly looking for non-existing attachments becomes a performance issue, this is either indicating a + // a problem somewhere else or this needs to be revisited. + + private val attachmentContentCache = NonInvalidatingWeightBasedCache>( + maxWeight = attachmentContentCacheSize, + concurrencyLevel = defaultCordaCacheConcurrencyLevel, + weigher = object : Weigher> { + override fun weigh(key: SecureHash, value: Optional): Int { + return key.size + if (value.isPresent) value.get().size else 0 + } + }, + loadFunction = { Optional.ofNullable(loadAttachmentContent(it)) } + ) + + private fun loadAttachmentContent(id: SecureHash): ByteArray? { val attachment = currentDBSession().get(NodeAttachmentService.DBAttachment::class.java, id.toString()) - attachment?.let { - return AttachmentImpl(id, { attachment.content }, checkAttachmentsOnLoad) + return attachment?.content + } + + + private val attachmentCache = NonInvalidatingCache>( + attachmentCacheBound, + defaultCordaCacheConcurrencyLevel, + { key -> Optional.ofNullable(createAttachment(key)) } + ) + + private fun createAttachment(key: SecureHash): Attachment? { + val content = attachmentContentCache.get(key) + if (content.isPresent) { + return AttachmentImpl( + key, + { + attachmentContentCache + .get(key) + .orElseThrow { + IllegalArgumentException("No attachement impl should have been created for non existent content") + } + }, + checkAttachmentsOnLoad) } + // if no attachement has been found, we don't want to cache that - it might arrive later + attachmentContentCache.invalidate(key) + return null + } + + override fun openAttachment(id: SecureHash): Attachment? { + val attachment = attachmentCache.get(id) + if (attachment.isPresent) { + return attachment.get() + } + attachmentCache.invalidate(id) return null } diff --git a/node/src/main/kotlin/net/corda/node/utilities/NonInvalidatingCache.kt b/node/src/main/kotlin/net/corda/node/utilities/NonInvalidatingCache.kt index 0a199a0e31..f08a0631cb 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/NonInvalidatingCache.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/NonInvalidatingCache.kt @@ -44,4 +44,6 @@ class NonInvalidatingWeightBasedCache private constructor( return builder.build(NonInvalidatingCache.NonInvalidatingCacheLoader(loadFunction)) } } -} \ No newline at end of file +} + +val defaultCordaCacheConcurrencyLevel: Int = 8 \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/NodeAttachmentStorageTest.kt b/node/src/test/kotlin/net/corda/node/services/persistence/NodeAttachmentStorageTest.kt index 5995f3c177..861a324e78 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/NodeAttachmentStorageTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/NodeAttachmentStorageTest.kt @@ -13,18 +13,18 @@ import net.corda.core.node.services.vault.AttachmentQueryCriteria import net.corda.core.node.services.vault.AttachmentSort import net.corda.core.node.services.vault.Builder import net.corda.core.node.services.vault.Sort -import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.internal.configureDatabase +import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.testing.internal.LogHelper -import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.internal.rigorousMock +import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import org.junit.After import org.junit.Before import org.junit.Ignore import org.junit.Test -import java.nio.charset.Charset +import java.nio.charset.StandardCharsets import java.nio.file.FileAlreadyExistsException import java.nio.file.FileSystem import java.nio.file.Path @@ -66,10 +66,10 @@ class NodeAttachmentStorageTest { val stream = storage.openAttachment(expectedHash)!!.openAsJAR() val e1 = stream.nextJarEntry!! assertEquals("test1.txt", e1.name) - assertEquals(stream.readBytes().toString(Charset.defaultCharset()), "This is some useful content") + assertEquals(stream.readBytes().toString(StandardCharsets.UTF_8), "This is some useful content") val e2 = stream.nextJarEntry!! assertEquals("test2.txt", e2.name) - assertEquals(stream.readBytes().toString(Charset.defaultCharset()), "Some more useful content") + assertEquals(stream.readBytes().toString(StandardCharsets.UTF_8), "Some more useful content") stream.close() @@ -80,6 +80,44 @@ class NodeAttachmentStorageTest { } } + @Test + fun `missing is not cached`() { + val (testJar, expectedHash) = makeTestJar() + val (jarB, hashB) = makeTestJar(listOf(Pair("file", "content"))) + + database.transaction { + val storage = NodeAttachmentService(MetricRegistry()) + val id = testJar.read { storage.importAttachment(it) } + assertEquals(expectedHash, id) + + + assertNull(storage.openAttachment(hashB)) + val stream = storage.openAttachment(expectedHash)!!.openAsJAR() + val e1 = stream.nextJarEntry!! + assertEquals("test1.txt", e1.name) + assertEquals(stream.readBytes().toString(StandardCharsets.UTF_8), "This is some useful content") + val e2 = stream.nextJarEntry!! + assertEquals("test2.txt", e2.name) + assertEquals(stream.readBytes().toString(StandardCharsets.UTF_8), "Some more useful content") + + stream.close() + + val idB = jarB.read { storage.importAttachment(it) } + assertEquals(hashB, idB) + + storage.openAttachment(id)!!.openAsJAR().use { + it.nextJarEntry + it.readBytes() + } + + storage.openAttachment(idB)!!.openAsJAR().use { + it.nextJarEntry + it.readBytes() + } + } + } + + @Test fun `metadata can be used to search`() { val (jarA, _) = makeTestJar()