mirror of
https://github.com/corda/corda.git
synced 2025-05-31 22:50:53 +00:00
CORDA-929 Attachment caching (#2372)
* ENT-1403 Cache node attachments (and attachment content) * ENT-1403 Make cache sizes configurable * Update documentation with new config parameters * Test that non-existence of attachments is not cached * Remove unneeded defaults in interface * It turned out we need the defaults on the interface in quite a few tests * Codereview: typos, size in MB rather than bytes, charset in tests, move concurrencyLevel to a constant * Codereview: Make the internal config value bytes again, but config file in MB * Fix example config unit test
This commit is contained in:
parent
4a3379ac8a
commit
8d5611853a
@ -171,3 +171,12 @@ path to the node's base directory.
|
|||||||
|
|
||||||
:useAMQPBridges: Optionally can be set to ``false`` to use Artemis CORE Bridges for peer-to-peer communications.
|
:useAMQPBridges: Optionally can be set to ``false`` to use Artemis CORE Bridges for peer-to-peer communications.
|
||||||
Otherwise, defaults to ``true`` and the AMQP 1.0 protocol will be used for message transfer between nodes.
|
Otherwise, defaults to ``true`` and the AMQP 1.0 protocol will be used for message transfer between nodes.
|
||||||
|
|
||||||
|
:transactionCacheSizeMegaBytes: Optionally specify how much memory should be used for caching of ledger transactions in memory.
|
||||||
|
Otherwise defaults to 8MB plus 5% of all heap memory above 300MB.
|
||||||
|
|
||||||
|
:attachmentContentCacheSizeMegaBytes: Optionally specify how much memory should be used to cache attachment contents in memory.
|
||||||
|
Otherwise defaults to 10MB
|
||||||
|
|
||||||
|
:attachmentCacheBound: Optionally specify how many attachments should be cached locally. Note that this includes only the key and
|
||||||
|
metadata, the content is cached separately and can be loaded lazily. Defaults to 1024.
|
@ -1,12 +1,15 @@
|
|||||||
package net.corda.docs
|
package net.corda.docs
|
||||||
|
|
||||||
import net.corda.node.services.config.ConfigHelper
|
import net.corda.node.services.config.ConfigHelper
|
||||||
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
import net.corda.node.services.config.parseAsNodeConfiguration
|
import net.corda.node.services.config.parseAsNodeConfiguration
|
||||||
import net.corda.verifier.Verifier
|
import net.corda.verifier.Verifier
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
import java.nio.file.Paths
|
import java.nio.file.Paths
|
||||||
|
import kotlin.reflect.KVisibility
|
||||||
import kotlin.reflect.full.declaredMemberProperties
|
import kotlin.reflect.full.declaredMemberProperties
|
||||||
|
import kotlin.reflect.jvm.isAccessible
|
||||||
|
|
||||||
class ExampleConfigTest {
|
class ExampleConfigTest {
|
||||||
|
|
||||||
@ -17,14 +20,16 @@ class ExampleConfigTest {
|
|||||||
val config = loadConfig(Paths.get(configFileResource.toURI()))
|
val config = loadConfig(Paths.get(configFileResource.toURI()))
|
||||||
// Force the config fields as they are resolved lazily
|
// Force the config fields as they are resolved lazily
|
||||||
config.javaClass.kotlin.declaredMemberProperties.forEach { member ->
|
config.javaClass.kotlin.declaredMemberProperties.forEach { member ->
|
||||||
|
if (member.visibility == KVisibility.PUBLIC) {
|
||||||
member.get(config)
|
member.get(config)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `example node_confs parses fine`() {
|
fun `example node_confs parses fine`() {
|
||||||
readAndCheckConfigurations(
|
readAndCheckConfigurations<NodeConfiguration>(
|
||||||
"example-node.conf",
|
"example-node.conf",
|
||||||
"example-out-of-process-verifier-node.conf",
|
"example-out-of-process-verifier-node.conf",
|
||||||
"example-network-map-node.conf"
|
"example-network-map-node.conf"
|
||||||
|
@ -36,10 +36,7 @@ import net.corda.node.services.ContractUpgradeHandler
|
|||||||
import net.corda.node.services.FinalityHandler
|
import net.corda.node.services.FinalityHandler
|
||||||
import net.corda.node.services.NotaryChangeHandler
|
import net.corda.node.services.NotaryChangeHandler
|
||||||
import net.corda.node.services.api.*
|
import net.corda.node.services.api.*
|
||||||
import net.corda.node.services.config.BFTSMaRtConfiguration
|
import net.corda.node.services.config.*
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
|
||||||
import net.corda.node.services.config.NotaryConfig
|
|
||||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
|
||||||
import net.corda.node.services.events.NodeSchedulerService
|
import net.corda.node.services.events.NodeSchedulerService
|
||||||
import net.corda.node.services.events.ScheduledActivityObserver
|
import net.corda.node.services.events.ScheduledActivityObserver
|
||||||
import net.corda.node.services.identity.PersistentIdentityService
|
import net.corda.node.services.identity.PersistentIdentityService
|
||||||
@ -536,7 +533,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
|||||||
private fun makeServices(keyPairs: Set<KeyPair>, schemaService: SchemaService, transactionStorage: WritableTransactionStorage, database: CordaPersistence, info: NodeInfo, identityService: IdentityServiceInternal, networkMapCache: NetworkMapCacheInternal): MutableList<Any> {
|
private fun makeServices(keyPairs: Set<KeyPair>, schemaService: SchemaService, transactionStorage: WritableTransactionStorage, database: CordaPersistence, info: NodeInfo, identityService: IdentityServiceInternal, networkMapCache: NetworkMapCacheInternal): MutableList<Any> {
|
||||||
checkpointStorage = DBCheckpointStorage()
|
checkpointStorage = DBCheckpointStorage()
|
||||||
val metrics = MetricRegistry()
|
val metrics = MetricRegistry()
|
||||||
attachments = NodeAttachmentService(metrics)
|
attachments = NodeAttachmentService(metrics, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound)
|
||||||
val cordappProvider = CordappProviderImpl(cordappLoader, attachments)
|
val cordappProvider = CordappProviderImpl(cordappLoader, attachments)
|
||||||
val keyManagementService = makeKeyManagementService(identityService, keyPairs)
|
val keyManagementService = makeKeyManagementService(identityService, keyPairs)
|
||||||
_services = ServiceHubInternalImpl(
|
_services = ServiceHubInternalImpl(
|
||||||
|
@ -14,6 +14,7 @@ import java.net.URL
|
|||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
|
|
||||||
val Int.MB: Long get() = this * 1024L * 1024L
|
val Int.MB: Long get() = this * 1024L * 1024L
|
||||||
|
|
||||||
interface NodeConfiguration : NodeSSLConfiguration {
|
interface NodeConfiguration : NodeSSLConfiguration {
|
||||||
@ -42,6 +43,9 @@ interface NodeConfiguration : NodeSSLConfiguration {
|
|||||||
val database: DatabaseConfig
|
val database: DatabaseConfig
|
||||||
val useAMQPBridges: Boolean get() = true
|
val useAMQPBridges: Boolean get() = true
|
||||||
val transactionCacheSizeBytes: Long get() = defaultTransactionCacheSize
|
val transactionCacheSizeBytes: Long get() = defaultTransactionCacheSize
|
||||||
|
val attachmentContentCacheSizeBytes: Long get() = defaultAttachmentContentCacheSize
|
||||||
|
val attachmentCacheBound: Long get() = defaultAttachmentCacheBound
|
||||||
|
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
// default to at least 8MB and a bit extra for larger heap sizes
|
// default to at least 8MB and a bit extra for larger heap sizes
|
||||||
@ -51,6 +55,9 @@ interface NodeConfiguration : NodeSSLConfiguration {
|
|||||||
private fun getAdditionalCacheMemory(): Long {
|
private fun getAdditionalCacheMemory(): Long {
|
||||||
return Math.max((Runtime.getRuntime().maxMemory() - 300.MB) / 20, 0)
|
return Math.max((Runtime.getRuntime().maxMemory() - 300.MB) / 20, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val defaultAttachmentContentCacheSize: Long = 10.MB
|
||||||
|
val defaultAttachmentCacheBound = 1024L
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -127,10 +134,17 @@ data class NodeConfigurationImpl(
|
|||||||
override val sshd: SSHDConfiguration? = null,
|
override val sshd: SSHDConfiguration? = null,
|
||||||
override val database: DatabaseConfig = DatabaseConfig(initialiseSchema = devMode, exportHibernateJMXStatistics = devMode),
|
override val database: DatabaseConfig = DatabaseConfig(initialiseSchema = devMode, exportHibernateJMXStatistics = devMode),
|
||||||
override val useAMQPBridges: Boolean = true,
|
override val useAMQPBridges: Boolean = true,
|
||||||
override val transactionCacheSizeBytes: Long = NodeConfiguration.defaultTransactionCacheSize
|
private val transactionCacheSizeMegaBytes: Int? = null,
|
||||||
|
private val attachmentContentCacheSizeMegaBytes: Int? = null,
|
||||||
|
override val attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound
|
||||||
) : NodeConfiguration {
|
) : NodeConfiguration {
|
||||||
|
|
||||||
override val exportJMXto: String get() = "http"
|
override val exportJMXto: String get() = "http"
|
||||||
|
override val transactionCacheSizeBytes: Long
|
||||||
|
get() = transactionCacheSizeMegaBytes?.MB ?: super.transactionCacheSizeBytes
|
||||||
|
override val attachmentContentCacheSizeBytes: Long
|
||||||
|
get() = attachmentContentCacheSizeMegaBytes?.MB ?: super.attachmentContentCacheSizeBytes
|
||||||
|
|
||||||
|
|
||||||
init {
|
init {
|
||||||
// This is a sanity feature do not remove.
|
// This is a sanity feature do not remove.
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package net.corda.node.services.persistence
|
package net.corda.node.services.persistence
|
||||||
|
|
||||||
import com.codahale.metrics.MetricRegistry
|
import com.codahale.metrics.MetricRegistry
|
||||||
|
import com.google.common.cache.Weigher
|
||||||
import com.google.common.hash.HashCode
|
import com.google.common.hash.HashCode
|
||||||
import com.google.common.hash.Hashing
|
import com.google.common.hash.Hashing
|
||||||
import com.google.common.hash.HashingInputStream
|
import com.google.common.hash.HashingInputStream
|
||||||
@ -16,12 +17,17 @@ import net.corda.core.node.services.vault.AttachmentQueryCriteria
|
|||||||
import net.corda.core.node.services.vault.AttachmentSort
|
import net.corda.core.node.services.vault.AttachmentSort
|
||||||
import net.corda.core.serialization.*
|
import net.corda.core.serialization.*
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser
|
import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser
|
||||||
|
import net.corda.node.utilities.NonInvalidatingCache
|
||||||
|
import net.corda.node.utilities.NonInvalidatingWeightBasedCache
|
||||||
|
import net.corda.node.utilities.defaultCordaCacheConcurrencyLevel
|
||||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||||
import java.io.*
|
import java.io.*
|
||||||
import java.nio.file.Paths
|
import java.nio.file.Paths
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
import java.util.*
|
||||||
import java.util.jar.JarInputStream
|
import java.util.jar.JarInputStream
|
||||||
import javax.annotation.concurrent.ThreadSafe
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
import javax.persistence.*
|
import javax.persistence.*
|
||||||
@ -30,7 +36,12 @@ import javax.persistence.*
|
|||||||
* Stores attachments using Hibernate to database.
|
* Stores attachments using Hibernate to database.
|
||||||
*/
|
*/
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, SingletonSerializeAsToken() {
|
class NodeAttachmentService(
|
||||||
|
metrics: MetricRegistry,
|
||||||
|
attachmentContentCacheSize: Long = NodeConfiguration.defaultAttachmentContentCacheSize,
|
||||||
|
attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound
|
||||||
|
) : AttachmentStorage, SingletonSerializeAsToken(
|
||||||
|
) {
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
@ -172,11 +183,67 @@ class NodeAttachmentService(metrics: MetricRegistry) : AttachmentStorage, Single
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun openAttachment(id: SecureHash): Attachment? {
|
|
||||||
val attachment = currentDBSession().get(NodeAttachmentService.DBAttachment::class.java, id.toString())
|
// slightly complex 2 level approach to attachment caching:
|
||||||
attachment?.let {
|
// On the first level we cache attachment contents loaded from the DB by their key. This is a weight based
|
||||||
return AttachmentImpl(id, { attachment.content }, checkAttachmentsOnLoad)
|
// cache (we don't want to waste too much memory on this) and could be evicted quite aggressively. If we fail
|
||||||
|
// to load an attachment from the db, the loader will insert a non present optional - we invalidate this
|
||||||
|
// immediately as we definitely want to retry whether the attachment was just delayed.
|
||||||
|
// On the second level, we cache Attachment implementations that use the first cache to load their content
|
||||||
|
// when required. As these are fairly small, we can cache quite a lot of them, this will make checking
|
||||||
|
// repeatedly whether an attachment exists fairly cheap. Here as well, we evict non-existent entries immediately
|
||||||
|
// to force a recheck if required.
|
||||||
|
// If repeatedly looking for non-existing attachments becomes a performance issue, this is either indicating a
|
||||||
|
// a problem somewhere else or this needs to be revisited.
|
||||||
|
|
||||||
|
private val attachmentContentCache = NonInvalidatingWeightBasedCache<SecureHash, Optional<ByteArray>>(
|
||||||
|
maxWeight = attachmentContentCacheSize,
|
||||||
|
concurrencyLevel = defaultCordaCacheConcurrencyLevel,
|
||||||
|
weigher = object : Weigher<SecureHash, Optional<ByteArray>> {
|
||||||
|
override fun weigh(key: SecureHash, value: Optional<ByteArray>): Int {
|
||||||
|
return key.size + if (value.isPresent) value.get().size else 0
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
loadFunction = { Optional.ofNullable(loadAttachmentContent(it)) }
|
||||||
|
)
|
||||||
|
|
||||||
|
private fun loadAttachmentContent(id: SecureHash): ByteArray? {
|
||||||
|
val attachment = currentDBSession().get(NodeAttachmentService.DBAttachment::class.java, id.toString())
|
||||||
|
return attachment?.content
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private val attachmentCache = NonInvalidatingCache<SecureHash, Optional<Attachment>>(
|
||||||
|
attachmentCacheBound,
|
||||||
|
defaultCordaCacheConcurrencyLevel,
|
||||||
|
{ key -> Optional.ofNullable(createAttachment(key)) }
|
||||||
|
)
|
||||||
|
|
||||||
|
private fun createAttachment(key: SecureHash): Attachment? {
|
||||||
|
val content = attachmentContentCache.get(key)
|
||||||
|
if (content.isPresent) {
|
||||||
|
return AttachmentImpl(
|
||||||
|
key,
|
||||||
|
{
|
||||||
|
attachmentContentCache
|
||||||
|
.get(key)
|
||||||
|
.orElseThrow {
|
||||||
|
IllegalArgumentException("No attachement impl should have been created for non existent content")
|
||||||
|
}
|
||||||
|
},
|
||||||
|
checkAttachmentsOnLoad)
|
||||||
|
}
|
||||||
|
// if no attachement has been found, we don't want to cache that - it might arrive later
|
||||||
|
attachmentContentCache.invalidate(key)
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun openAttachment(id: SecureHash): Attachment? {
|
||||||
|
val attachment = attachmentCache.get(id)
|
||||||
|
if (attachment.isPresent) {
|
||||||
|
return attachment.get()
|
||||||
|
}
|
||||||
|
attachmentCache.invalidate(id)
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,3 +45,5 @@ class NonInvalidatingWeightBasedCache<K, V> private constructor(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val defaultCordaCacheConcurrencyLevel: Int = 8
|
@ -13,18 +13,18 @@ import net.corda.core.node.services.vault.AttachmentQueryCriteria
|
|||||||
import net.corda.core.node.services.vault.AttachmentSort
|
import net.corda.core.node.services.vault.AttachmentSort
|
||||||
import net.corda.core.node.services.vault.Builder
|
import net.corda.core.node.services.vault.Builder
|
||||||
import net.corda.core.node.services.vault.Sort
|
import net.corda.core.node.services.vault.Sort
|
||||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
|
||||||
import net.corda.node.internal.configureDatabase
|
import net.corda.node.internal.configureDatabase
|
||||||
|
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||||
import net.corda.testing.internal.LogHelper
|
import net.corda.testing.internal.LogHelper
|
||||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
|
||||||
import net.corda.testing.internal.rigorousMock
|
import net.corda.testing.internal.rigorousMock
|
||||||
|
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
import org.junit.Before
|
import org.junit.Before
|
||||||
import org.junit.Ignore
|
import org.junit.Ignore
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import java.nio.charset.Charset
|
import java.nio.charset.StandardCharsets
|
||||||
import java.nio.file.FileAlreadyExistsException
|
import java.nio.file.FileAlreadyExistsException
|
||||||
import java.nio.file.FileSystem
|
import java.nio.file.FileSystem
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
@ -66,10 +66,10 @@ class NodeAttachmentStorageTest {
|
|||||||
val stream = storage.openAttachment(expectedHash)!!.openAsJAR()
|
val stream = storage.openAttachment(expectedHash)!!.openAsJAR()
|
||||||
val e1 = stream.nextJarEntry!!
|
val e1 = stream.nextJarEntry!!
|
||||||
assertEquals("test1.txt", e1.name)
|
assertEquals("test1.txt", e1.name)
|
||||||
assertEquals(stream.readBytes().toString(Charset.defaultCharset()), "This is some useful content")
|
assertEquals(stream.readBytes().toString(StandardCharsets.UTF_8), "This is some useful content")
|
||||||
val e2 = stream.nextJarEntry!!
|
val e2 = stream.nextJarEntry!!
|
||||||
assertEquals("test2.txt", e2.name)
|
assertEquals("test2.txt", e2.name)
|
||||||
assertEquals(stream.readBytes().toString(Charset.defaultCharset()), "Some more useful content")
|
assertEquals(stream.readBytes().toString(StandardCharsets.UTF_8), "Some more useful content")
|
||||||
|
|
||||||
stream.close()
|
stream.close()
|
||||||
|
|
||||||
@ -80,6 +80,44 @@ class NodeAttachmentStorageTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `missing is not cached`() {
|
||||||
|
val (testJar, expectedHash) = makeTestJar()
|
||||||
|
val (jarB, hashB) = makeTestJar(listOf(Pair("file", "content")))
|
||||||
|
|
||||||
|
database.transaction {
|
||||||
|
val storage = NodeAttachmentService(MetricRegistry())
|
||||||
|
val id = testJar.read { storage.importAttachment(it) }
|
||||||
|
assertEquals(expectedHash, id)
|
||||||
|
|
||||||
|
|
||||||
|
assertNull(storage.openAttachment(hashB))
|
||||||
|
val stream = storage.openAttachment(expectedHash)!!.openAsJAR()
|
||||||
|
val e1 = stream.nextJarEntry!!
|
||||||
|
assertEquals("test1.txt", e1.name)
|
||||||
|
assertEquals(stream.readBytes().toString(StandardCharsets.UTF_8), "This is some useful content")
|
||||||
|
val e2 = stream.nextJarEntry!!
|
||||||
|
assertEquals("test2.txt", e2.name)
|
||||||
|
assertEquals(stream.readBytes().toString(StandardCharsets.UTF_8), "Some more useful content")
|
||||||
|
|
||||||
|
stream.close()
|
||||||
|
|
||||||
|
val idB = jarB.read { storage.importAttachment(it) }
|
||||||
|
assertEquals(hashB, idB)
|
||||||
|
|
||||||
|
storage.openAttachment(id)!!.openAsJAR().use {
|
||||||
|
it.nextJarEntry
|
||||||
|
it.readBytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
storage.openAttachment(idB)!!.openAsJAR().use {
|
||||||
|
it.nextJarEntry
|
||||||
|
it.readBytes()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `metadata can be used to search`() {
|
fun `metadata can be used to search`() {
|
||||||
val (jarA, _) = makeTestJar()
|
val (jarA, _) = makeTestJar()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user