ENT-2991 Deadlock between AppendOnlyPersistentMap and underlying database (#4676)

* Remove locks around database access in AppendOnlyPersistentMap and introduce
a unit test that checks that known deadlock scenarios of the old version
are avoided.

* Fix Deadlock unit test

* Add some extra latching to try and make timing less fragile.  Can never be perfect though.

* Review feedback, and some thread safety fixes.
This commit is contained in:
Christian Sailer 2019-01-30 18:20:53 +00:00 committed by Rick Parker
parent 91c2d75f31
commit deba96dce9
4 changed files with 331 additions and 67 deletions

View File

@ -79,7 +79,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
private const val transactionSignatureOverheadEstimate = 1024 private const val transactionSignatureOverheadEstimate = 1024
private fun weighTx(tx: AppendOnlyPersistentMapBase.Transactional<TxCacheValue>): Int { private fun weighTx(tx: AppendOnlyPersistentMapBase.Transactional<TxCacheValue>): Int {
val actTx = tx.valueWithoutIsolation val actTx = tx.peekableValue
if (actTx == null) { if (actTx == null) {
return 0 return 0
} }

View File

@ -63,23 +63,24 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
when (oldValue) { when (oldValue) {
is Transactional.InFlight<*, V> -> { is Transactional.InFlight<*, V> -> {
// Someone else is writing, so store away! // Someone else is writing, so store away!
// TODO: we can do collision detection here and prevent it happening in the database. But we also have to do deadlock detection, so a bit of work.
isUnique = (store(key, value) == null) isUnique = (store(key, value) == null)
oldValue.apply { alsoWrite(value) } oldValue.apply { alsoWrite(value) }
} }
is Transactional.Committed<V> -> oldValue // The value is already globally visible and cached. So do nothing since the values are always the same. is Transactional.Committed<V> -> oldValue // The value is already globally visible and cached. So do nothing since the values are always the same.
else -> { is Transactional.Unknown<*, V> -> {
// Null or Missing. Store away! if (oldValue.isResolved && oldValue.isPresent) {
isUnique = (store(key, value) == null) Transactional.Committed(oldValue.value)
if (!isUnique && !weAreWriting(key)) {
// If we found a value already in the database, and we were not already writing, then it's already committed but got evicted.
Transactional.Committed(value)
} else { } else {
// Some database transactions, including us, writing, with readers seeing whatever is in the database and writers seeing the (in memory) value. // Unknown. Store away!
Transactional.InFlight(this, key, _readerValueLoader = { loadValue(key) }).apply { alsoWrite(value) } isUnique = (store(key, value) == null)
transactionalForStoreResult(isUnique, key, value)
} }
} }
else -> {
// Missing or null. Store away!
isUnique = (store(key, value) == null)
transactionalForStoreResult(isUnique, key, value)
}
} }
} }
if (logWarning && !isUnique) { if (logWarning && !isUnique) {
@ -88,6 +89,16 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
return isUnique return isUnique
} }
private fun transactionalForStoreResult(isUnique: Boolean, key: K, value: V): Transactional<V> {
return if (!isUnique && !weAreWriting(key)) {
// If we found a value already in the database, and we were not already writing, then it's already committed but got evicted.
Transactional.Committed(value)
} else {
// Some database transactions, including us, writing, with readers seeing whatever is in the database and writers seeing the (in memory) value.
Transactional.InFlight(this, key, _readerValueLoader = { loadValue(key) }).apply { alsoWrite(value) }
}
}
/** /**
* Associates the specified value with the specified key in this map and persists it. * Associates the specified value with the specified key in this map and persists it.
* If the map previously contained a mapping for the key, the behaviour is unpredictable and may throw an error from the underlying storage. * If the map previously contained a mapping for the key, the behaviour is unpredictable and may throw an error from the underlying storage.
@ -122,7 +133,7 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
} }
} }
protected fun loadValue(key: K): V? { private fun loadValue(key: K): V? {
val session = currentDBSession() val session = currentDBSession()
val flushing = contextTransaction.flushing val flushing = contextTransaction.flushing
if (!flushing) { if (!flushing) {
@ -134,6 +145,19 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
return result?.apply { if (!flushing) session.detach(result) }?.let(fromPersistentEntity)?.second return result?.apply { if (!flushing) session.detach(result) }?.let(fromPersistentEntity)?.second
} }
protected fun transactionalLoadValue(key: K): Transactional<V> {
// This gets called if a value is read and the cache has no Transactional for this key yet.
return if (anyoneWriting(key)) {
// If someone is writing (but not us)
// For those not writing, they need to re-load the value from the database (which their database transaction MIGHT see).
// For those writing, they need to re-load the value from the database (which their database transaction CAN see).
Transactional.InFlight(this, key, { loadValue(key) }, { loadValue(key)!! })
} else {
// If no one is writing, then the value may or may not exist in the database.
Transactional.Unknown(this, key, { loadValue(key) })
}
}
operator fun contains(key: K) = get(key) != null operator fun contains(key: K) = get(key) != null
/** /**
@ -149,8 +173,9 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
} }
// Helpers to know if transaction(s) are currently writing the given key. // Helpers to know if transaction(s) are currently writing the given key.
protected fun weAreWriting(key: K): Boolean = pendingKeys[key]?.contains(contextTransaction) ?: false private fun weAreWriting(key: K): Boolean = pendingKeys[key]?.contains(contextTransaction) ?: false
protected fun anyoneWriting(key: K): Boolean = pendingKeys[key]?.isNotEmpty() ?: false
private fun anyoneWriting(key: K): Boolean = pendingKeys[key]?.isNotEmpty() ?: false
// Indicate this database transaction is a writer of this key. // Indicate this database transaction is a writer of this key.
private fun addPendingKey(key: K, databaseTransaction: DatabaseTransaction): Boolean { private fun addPendingKey(key: K, databaseTransaction: DatabaseTransaction): Boolean {
@ -189,7 +214,7 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
sealed class Transactional<T> { sealed class Transactional<T> {
abstract val value: T abstract val value: T
abstract val isPresent: Boolean abstract val isPresent: Boolean
abstract val valueWithoutIsolation: T? abstract val peekableValue: T?
fun orElse(alt: T?) = if (isPresent) value else alt fun orElse(alt: T?) = if (isPresent) value else alt
@ -197,7 +222,7 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
class Committed<T>(override val value: T) : Transactional<T>() { class Committed<T>(override val value: T) : Transactional<T>() {
override val isPresent: Boolean override val isPresent: Boolean
get() = true get() = true
override val valueWithoutIsolation: T? override val peekableValue: T?
get() = value get() = value
} }
@ -207,10 +232,32 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
get() = throw NoSuchElementException("Not present") get() = throw NoSuchElementException("Not present")
override val isPresent: Boolean override val isPresent: Boolean
get() = false get() = false
override val valueWithoutIsolation: T? override val peekableValue: T?
get() = null get() = null
} }
// No one is writing, but we haven't looked in the database yet. This can only be when there are no writers.
class Unknown<K, T>(private val map: AppendOnlyPersistentMapBase<K, T, *, *>,
private val key: K,
private val _valueLoader: () -> T?) : Transactional<T>() {
override val value: T
get() = valueWithoutIsolationDelegate.value ?: throw NoSuchElementException("Not present")
override val isPresent: Boolean
get() = valueWithoutIsolationDelegate.value != null
private val valueWithoutIsolationDelegate = lazy(LazyThreadSafetyMode.PUBLICATION) {
val readValue = _valueLoader()
// We re-write the value into the cache so that any weigher can re-assess the weight based on the loaded value.
map.cache.asMap().compute(key) { _, oldValue ->
if (oldValue === this@Unknown) {
if (readValue == null) Missing() else Committed(readValue)
} else oldValue
}
readValue
}
val isResolved: Boolean get() = valueWithoutIsolationDelegate.isInitialized()
override val peekableValue: T? get() = if (isResolved && isPresent) value else null
}
// Written in a transaction (uncommitted) somewhere, but there's a small window when this might be seen after commit, // Written in a transaction (uncommitted) somewhere, but there's a small window when this might be seen after commit,
// hence the committed flag. // hence the committed flag.
class InFlight<K, T>(private val map: AppendOnlyPersistentMapBase<K, T, *, *>, class InFlight<K, T>(private val map: AppendOnlyPersistentMapBase<K, T, *, *>,
@ -262,21 +309,21 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
// Lazy load the value a "writer" would see. If the original loader hasn't been replaced, replace it // Lazy load the value a "writer" would see. If the original loader hasn't been replaced, replace it
// with one that just returns the value once evaluated. // with one that just returns the value once evaluated.
private fun loadAsWriter(): T { private fun loadAsWriter(): T {
val _value = writerValueLoader.get()() val writerValue = writerValueLoader.get()()
if (writerValueLoader.get() == _writerValueLoader) { if (writerValueLoader.get() == _writerValueLoader) {
writerValueLoader.set { _value } writerValueLoader.set { writerValue }
} }
return _value return writerValue
} }
// Lazy load the value a "reader" would see. If the original loader hasn't been replaced, replace it // Lazy load the value a "reader" would see. If the original loader hasn't been replaced, replace it
// with one that just returns the value once evaluated. // with one that just returns the value once evaluated.
private fun loadAsReader(): T? { private fun loadAsReader(): T? {
val _value = readerValueLoader.get()() val readerValue = readerValueLoader.get()()
if (readerValueLoader.get() == _readerValueLoader) { if (readerValueLoader.get() == _readerValueLoader) {
readerValueLoader.set { _value } readerValueLoader.set { readerValue }
} }
return _value return readerValue
} }
// Whether someone reading (only) can see the entry. // Whether someone reading (only) can see the entry.
@ -293,7 +340,7 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
get() = if (isPresentAsWriter) loadAsWriter() else if (isPresentAsReader) loadAsReader()!! else throw NoSuchElementException("Not present") get() = if (isPresentAsWriter) loadAsWriter() else if (isPresentAsReader) loadAsReader()!! else throw NoSuchElementException("Not present")
// The value from the perspective of the eviction algorithm of the cache. i.e. we want to reveal memory footprint to it etc. // The value from the perspective of the eviction algorithm of the cache. i.e. we want to reveal memory footprint to it etc.
override val valueWithoutIsolation: T? override val peekableValue: T?
get() = if (writerValueLoader.get() != _writerValueLoader) writerValueLoader.get()() else if (readerValueLoader.get() != _writerValueLoader) readerValueLoader.get()() else null get() = if (writerValueLoader.get() != _writerValueLoader) writerValueLoader.get()() else if (readerValueLoader.get() != _writerValueLoader) readerValueLoader.get()() else null
} }
} }
@ -315,33 +362,7 @@ open class AppendOnlyPersistentMap<K, V, E, out EK>(
override val cache = NonInvalidatingCache( override val cache = NonInvalidatingCache(
cacheFactory = cacheFactory, cacheFactory = cacheFactory,
name = name, name = name,
loadFunction = { key: K -> loadFunction = { key: K -> transactionalLoadValue(key) })
// This gets called if a value is read and the cache has no Transactional for this key yet.
val value: V? = loadValue(key)
if (value == null) {
// No visible value
if (anyoneWriting(key)) {
// If someone is writing (but not us)
// For those not writing, the value cannot be seen.
// For those writing, they need to re-load the value from the database (which their database transaction CAN see).
Transactional.InFlight(this, key, { null }, { loadValue(key)!! })
} else {
// If no one is writing, then the value does not exist.
Transactional.Missing()
}
} else {
// A value was found
if (weAreWriting(key)) {
// If we are writing, it might not be globally visible, and was evicted from the cache.
// For those not writing, they need to check the database again.
// For those writing, they can see the value found.
Transactional.InFlight(this, key, { loadValue(key) }, { value })
} else {
// If no one is writing, then make it globally visible.
Transactional.Committed(value)
}
}
})
} }
// Same as above, but with weighted values (e.g. memory footprint sensitive). // Same as above, but with weighted values (e.g. memory footprint sensitive).
@ -362,20 +383,5 @@ class WeightBasedAppendOnlyPersistentMap<K, V, E, out EK>(
cacheFactory = cacheFactory, cacheFactory = cacheFactory,
name = name, name = name,
weigher = Weigher { key, value -> weighingFunc(key, value) }, weigher = Weigher { key, value -> weighingFunc(key, value) },
loadFunction = { key: K -> loadFunction = { key: K -> transactionalLoadValue(key) })
val value: V? = loadValue(key)
if (value == null) {
if (anyoneWriting(key)) {
Transactional.InFlight(this, key, { null }, { loadValue(key)!! })
} else {
Transactional.Missing()
}
} else {
if (weAreWriting(key)) {
Transactional.InFlight(this, key, { loadValue(key) }, { value })
} else {
Transactional.Committed(value)
}
}
})
} }

View File

@ -0,0 +1,233 @@
package net.corda.node.services.persistence
import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.contextLogger
import net.corda.node.internal.createCordaPersistence
import net.corda.node.internal.startHikariPool
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.TransactionIsolationLevel
import net.corda.testing.internal.TestingNamedCacheFactory
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.lang.Thread.sleep
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.atomic.AtomicReference
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
class TestKey(val value: Int) {
override fun equals(other: Any?): Boolean {
return (other as? TestKey)?.value?.equals(value) ?: false
}
/**
* Hash code is constant to provoke hash clashes in ConcurrentHashMap
*/
override fun hashCode(): Int {
return 127
}
}
@Entity
@javax.persistence.Table(name = "locktestobjects")
class MyPersistenceClass(
@Id
@Column(name = "lKey", nullable = false)
val key: Int,
@Column(name = "lValue", nullable = false)
val value: Int)
@Entity
@javax.persistence.Table(name = "otherlockobjects")
class SecondPersistenceClass(
@Id
@Column(name = "lKey", nullable = false)
val key: Int,
@Column(name = "lValue", nullable = false)
val value: Int)
object LockDbSchema
object LockDbSchemaV2 : MappedSchema(LockDbSchema.javaClass, 2, listOf(MyPersistenceClass::class.java, SecondPersistenceClass::class.java)) {
override val migrationResource: String? = "locktestschema"
}
class DbMapDeadlockTest {
companion object {
val log = contextLogger()
}
@Rule
@JvmField
val temporaryFolder = TemporaryFolder()
private val h2Properties: Properties
get() {
return Properties().also {
it.setProperty("dataSourceClassName", "org.h2.jdbcx.JdbcDataSource")
it.setProperty("dataSource.url", "jdbc:h2:file:${temporaryFolder.root}/persistence;DB_CLOSE_ON_EXIT=FALSE;WRITE_DELAY=0;LOCK_TIMEOUT=10000")
it.setProperty("dataSource.user", "sa")
it.setProperty("dataSource.password", "")
}
}
@Test
fun checkAppendOnlyPersistentMapForDeadlockH2() {
recreateDeadlock(h2Properties)
}
fun recreateDeadlock(hikariProperties: Properties) {
val cacheFactory = TestingNamedCacheFactory()
val dbConfig = DatabaseConfig(initialiseSchema = true, transactionIsolationLevel = TransactionIsolationLevel.READ_COMMITTED)
val schemaService = NodeSchemaService(extraSchemas = setOf(LockDbSchemaV2))
createCordaPersistence(dbConfig, { null }, { null }, schemaService, hikariProperties, cacheFactory, null).apply {
startHikariPool(hikariProperties, dbConfig, schemaService.schemaOptions.keys)
}.use { persistence ->
// First clean up any remains from previous test runs
persistence.transaction {
session.createNativeQuery("delete from locktestobjects").executeUpdate()
session.createNativeQuery("delete from otherlockobjects").executeUpdate()
}
// Prepare a few rows for reading in table 1
val prepMap = AppendOnlyPersistentMap<TestKey, Int, MyPersistenceClass, Int>(
cacheFactory,
"myTestCache",
{ k -> k.value },
{ e -> Pair(TestKey(e.key), e.value) },
{ k, v -> MyPersistenceClass(k.value, v) },
MyPersistenceClass::class.java
)
persistence.transaction {
prepMap.set(TestKey(1), 1)
prepMap.set(TestKey(2), 2)
prepMap.set(TestKey(10), 10)
}
// the map that will read from the prepared table
val testMap = AppendOnlyPersistentMap<TestKey, Int, MyPersistenceClass, Int>(
cacheFactory,
"myTestCache",
{ k -> k.value },
{ e -> Pair(TestKey(e.key), e.value) },
{ k, v -> MyPersistenceClass(k.value, v) },
MyPersistenceClass::class.java
)
// a second map that writes to another (unrelated table)
val otherMap = AppendOnlyPersistentMap<TestKey, Int, SecondPersistenceClass, Int>(
cacheFactory,
"myTestCache",
{ k -> k.value },
{ e -> Pair(TestKey(e.key), e.value) },
{ k, v -> SecondPersistenceClass(k.value, v) },
SecondPersistenceClass::class.java
)
val latch1 = CyclicBarrier(2)
val latch2 = CountDownLatch(1)
val latch3 = CyclicBarrier(2)
val otherThreadException = AtomicReference<Exception?>(null)
// This thread will wait for the main thread to do a few things. Then it will starting to read key 2, and write a key to
// the second table. This read will be buffered (not flushed) at first. The subsequent access to read value 10 fromt the
// first table will cause the previous write to flush. As the row this will be writing to should be locked from the main
// thread, it will wait for the main thread's db transaction to commit or rollback before proceeding with the read.
val otherThread = thread(name = "testThread2") {
try {
log.info("Thread2 waiting")
latch1.await()
latch2.await()
log.info("Thread2 starting transaction")
persistence.transaction {
log.info("Thread2 getting key 2")
testMap.get(TestKey(2))
log.info("Thread2 set other value 100")
otherMap.set(TestKey(100), 100)
latch3.await()
log.info("Thread2 getting value 10")
val v = testMap.get(TestKey(10))
assertEquals(10, v)
}
log.info("Thread2 done")
} catch (e: Exception) {
log.info("Thread2 threw") // Don't log the exception though, since we expect it and check in the assertions what it is.
otherThreadException.set(e)
}
}
log.info("MainThread waiting for Thread2 to start waiting")
latch1.await()
// The main thread will write to the same key in the second table, and then read key 1 from the read table. As it will do that
// before triggering the run on thread 2, it will get the row lock in the second table when flushing before the read, then
// read and carry on.
log.info("MainThread starting transaction")
persistence.transaction {
log.info("MainThread getting key 2")
testMap.get(TestKey(2))
log.info("MainThread set other key 100")
otherMap.set(TestKey(100), 100)
log.info("MainThread getting key 1")
testMap.get(TestKey(1))
// Then it will trigger the start of the second thread (see above) and then sleep for a bit to make sure the other
// thread actually runs and beats this thread to the get(10). The test will still pass if it doesn't.
log.info("MainThread signal")
latch2.countDown()
log.info("MainThread wait for Thread2 to be getting the same key")
latch3.await()
log.info("MainThread sleep for 2 seconds so ideally Thread2 reaches the get first")
sleep(2000)
// finally it will try to get the same value from the read table that the other thread is trying to read.
// If access to reading this value from the DB is guarded by a lock, the other thread will be holding this lock
// which means the threads are now deadlocked.
log.info("MainThread get value 10")
try {
assertEquals(10, testMap.get(TestKey(10)))
} catch (e: Exception) {
checkException(e)
}
}
log.info("MainThread joining with Thread2")
otherThread.join()
assertNotNull(otherThreadException.get())
checkException(otherThreadException.get())
log.info("MainThread done")
}
}
// We have to catch any exception thrown and check what they are - primary key constraint violations are fine, we are trying
// to insert the same key twice after all. Any deadlock time outs or similar are completely not fine and should be a test failure.
private fun checkException(exception: Exception?) {
if (exception == null) {
return
}
val persistenceException = exception as? javax.persistence.PersistenceException
if (persistenceException != null) {
val hibernateException = persistenceException.cause as? org.hibernate.exception.ConstraintViolationException
if (hibernateException != null) {
log.info("Primary key violation exception is fine")
return
}
}
throw exception
}
}

View File

@ -0,0 +1,25 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd">
<changeSet author="R3.Corda" id="rhubarb-crumble-1">
<createTable tableName="locktestobjects">
<column name="lKey" type="INT">
<constraints nullable="false"/>
</column>
<column name="lValue" type="INT">
<constraints nullable="false"/>
</column>
</createTable>
<addPrimaryKey columnNames="lKey" constraintName="locktest_pkey" tableName="locktestobjects"/>
<createTable tableName="otherlockobjects">
<column name="lKey" type="INT">
<constraints nullable="false"/>
</column>
<column name="lValue" type="INT">
<constraints nullable="false"/>
</column>
</createTable>
<addPrimaryKey columnNames="lKey" constraintName="otherlock_pkey" tableName="otherlockobjects"/>
</changeSet>
</databaseChangeLog>