mirror of
https://github.com/corda/corda.git
synced 2025-01-30 08:04:16 +00:00
ENT-3165 Backport caching changes to OS. (#4821)
This commit is contained in:
parent
4721b1d095
commit
2ff7860e4b
@ -2,16 +2,18 @@ package net.corda.node.utilities
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import net.corda.core.internal.NamedCacheFactory
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
/**
|
||||
* Wraps a Caffeine cache and provides thread safe and database transaction aware invalidation.
|
||||
*
|
||||
* All access should be via [get] and [invalidate]. Data to be mutated should be changed at source (presumed to be a database)
|
||||
* followed by a call to [invalidate] the value associated with a key. During periods of invalidity, the source will always be
|
||||
* consulted to resolve transaction visibility issues. This is why invalidation should be infrequent, otherwise the pessimism
|
||||
* of the cache for invalidated values will result in few cache hits.
|
||||
* All access should be via [get], [getIfPresent] or [invalidate]. Data to be mutated should be changed at source
|
||||
* (presumed to be a database) followed by a call to [invalidate] the value associated with a key.
|
||||
* During periods of invalidity, the source will always be consulted to resolve transaction visibility issues.
|
||||
* This is why invalidation should be infrequent, otherwise the pessimism of the cache for invalidated values will result in few cache hits.
|
||||
*/
|
||||
class InfrequentlyMutatedCache<K : Any, V : Any>(name: String, cacheFactory: NamedCacheFactory) {
|
||||
/**
|
||||
@ -21,8 +23,8 @@ class InfrequentlyMutatedCache<K : Any, V : Any>(name: String, cacheFactory: Nam
|
||||
* @param valueGetter A function to return the value for the key if the cache does not have it.
|
||||
*/
|
||||
fun get(key: K, valueGetter: (K) -> V): V {
|
||||
val wrapper = backingCache.get(key) { key: K ->
|
||||
Wrapper.Valid(valueGetter(key))
|
||||
val wrapper = backingCache.get(key) { k: K ->
|
||||
currentlyInvalid[k] ?: Wrapper.Valid(valueGetter(k))
|
||||
}
|
||||
return when(wrapper) {
|
||||
is Wrapper.Valid -> { wrapper.value }
|
||||
@ -30,6 +32,23 @@ class InfrequentlyMutatedCache<K : Any, V : Any>(name: String, cacheFactory: Nam
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the value associated with the given key in the cache, or null if not cached.
|
||||
*
|
||||
* @param key The key to retrieve.
|
||||
*/
|
||||
fun getIfPresent(key: K): V? {
|
||||
val wrapper = backingCache.get(key) { k: K ->
|
||||
null
|
||||
}
|
||||
return when (wrapper) {
|
||||
is Wrapper.Valid -> {
|
||||
wrapper.value
|
||||
}
|
||||
else -> null
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Inform the cache that the current value for the key may have been updated. Subsequent calls to [get]
|
||||
* will not use the current cached value. The point at which values start to be cached again will be
|
||||
@ -37,18 +56,27 @@ class InfrequentlyMutatedCache<K : Any, V : Any>(name: String, cacheFactory: Nam
|
||||
* who do not have transaction visibility of the updated value from re-populating the cache with an incorrect value.
|
||||
*/
|
||||
fun invalidate(key: K) {
|
||||
backingCache.asMap().compute(key) { key: K, value: Wrapper<V>? ->
|
||||
backingCache.asMap().compute(key) { k: K, value: Wrapper<V>? ->
|
||||
when(value) {
|
||||
is Wrapper.Valid -> { invalidate(key, Wrapper.Invalidated()) }
|
||||
is Wrapper.Invalidated -> { invalidate(key, value) }
|
||||
else -> { null }
|
||||
is Wrapper.Invalidated -> {
|
||||
invalidate(k, value)
|
||||
}
|
||||
else -> {
|
||||
invalidate(k, currentlyInvalid[k] ?: Wrapper.Invalidated())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
internal fun flushCache() {
|
||||
backingCache.invalidateAll()
|
||||
}
|
||||
|
||||
private fun invalidate(key: K, value: Wrapper.Invalidated<V>): Wrapper.Invalidated<V> {
|
||||
val tx = contextTransactionOrNull
|
||||
value.invalidators.incrementAndGet()
|
||||
currentlyInvalid[key] = value
|
||||
if (tx != null) {
|
||||
// When we close, we can't start using caching again until all simultaneously open transactions are closed.
|
||||
tx.onClose { tx.database.onAllOpenTransactionsClosed { decrementInvalidators(key, value) } }
|
||||
@ -63,6 +91,7 @@ class InfrequentlyMutatedCache<K : Any, V : Any>(name: String, cacheFactory: Nam
|
||||
// Maybe we can replace the invalidated value with nothing, so it gets loaded next time.
|
||||
backingCache.asMap().compute(key) { key: K, currentValue: Wrapper<V>? ->
|
||||
if(currentValue === value && value.invalidators.get() == 0) {
|
||||
currentlyInvalid.remove(key)
|
||||
null
|
||||
} else currentValue
|
||||
}
|
||||
@ -71,14 +100,14 @@ class InfrequentlyMutatedCache<K : Any, V : Any>(name: String, cacheFactory: Nam
|
||||
|
||||
private val backingCache = cacheFactory.buildNamed<K, Wrapper<V>>(Caffeine.newBuilder(), name)
|
||||
|
||||
private sealed class Wrapper<V : Any> {
|
||||
abstract val value: V?
|
||||
// This protects against the cache purging something that is marked as invalid and thus we "forget" it shouldn't be cached.
|
||||
private val currentlyInvalid = ConcurrentHashMap<K, Wrapper.Invalidated<V>>()
|
||||
|
||||
private sealed class Wrapper<V : Any> {
|
||||
class Invalidated<V : Any> : Wrapper<V>() {
|
||||
val invalidators = AtomicInteger(0)
|
||||
override val value: V? = null
|
||||
}
|
||||
|
||||
class Valid<V : Any>(override val value: V) : Wrapper<V>()
|
||||
class Valid<V : Any>(val value: V) : Wrapper<V>()
|
||||
}
|
||||
}
|
@ -11,6 +11,7 @@ import org.junit.Test
|
||||
import java.util.concurrent.Phaser
|
||||
import kotlin.concurrent.thread
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNull
|
||||
|
||||
class InfrequentlyMutatedCacheTest {
|
||||
private val cache = InfrequentlyMutatedCache<String, Int>("foo", TestingNamedCacheFactory())
|
||||
@ -35,6 +36,14 @@ class InfrequentlyMutatedCacheTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `getIfPresent from empty cache returns null`() {
|
||||
database.transaction {
|
||||
val result = cache.getIfPresent("foo")
|
||||
assertNull(result)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `other thread get returns result of local thread loader`() {
|
||||
database.transaction {
|
||||
@ -63,6 +72,18 @@ class InfrequentlyMutatedCacheTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `getIfPresent after get from empty cache returns result of first loader`() {
|
||||
database.transaction {
|
||||
// This will cache "2"
|
||||
cache.get("foo") {
|
||||
2
|
||||
}
|
||||
val result = cache.getIfPresent("foo")
|
||||
assertEquals(2, result)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `second get from empty cache with invalidate in the middle returns result of second loader`() {
|
||||
database.transaction {
|
||||
@ -78,6 +99,38 @@ class InfrequentlyMutatedCacheTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `getIfPresent after get from empty cache with invalidate in the middle returns null`() {
|
||||
database.transaction {
|
||||
// This will cache "2"
|
||||
cache.get("foo") {
|
||||
2
|
||||
}
|
||||
cache.invalidate("foo")
|
||||
val result = cache.getIfPresent("foo")
|
||||
assertNull(result)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `second get from empty cache with invalidate and flush in the middle returns result of third loader`() {
|
||||
database.transaction {
|
||||
// This will cache "2"
|
||||
cache.get("foo") {
|
||||
3
|
||||
}
|
||||
cache.invalidate("foo")
|
||||
cache.flushCache()
|
||||
cache.get("foo") {
|
||||
2
|
||||
}
|
||||
val result = cache.get("foo") {
|
||||
1
|
||||
}
|
||||
assertEquals(1, result)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `other thread get with invalidate in the middle returns result of second loader`() {
|
||||
database.transaction {
|
||||
@ -118,6 +171,26 @@ class InfrequentlyMutatedCacheTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `getIfPresent outside first transaction from empty cache with invalidate in the middle returns result of third loader`() {
|
||||
database.transaction {
|
||||
// This will cache "2"
|
||||
cache.get("foo") {
|
||||
2
|
||||
}
|
||||
cache.invalidate("foo")
|
||||
// This should not get cached, as the transaction that invalidated is still in-flight.
|
||||
val result = cache.get("foo") {
|
||||
1
|
||||
}
|
||||
assertEquals(1, result)
|
||||
}
|
||||
database.transaction {
|
||||
val result = cache.getIfPresent("foo")
|
||||
assertNull(result)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `other thread get outside first transaction with invalidate in the middle returns result of other thread`() {
|
||||
database.transaction {
|
||||
|
Loading…
x
Reference in New Issue
Block a user