mirror of
https://github.com/corda/corda.git
synced 2024-12-21 13:57:54 +00:00
CORDA-2705 - Prevent duplicates in cache and fix the mappings persisted for confidential identities
This commit is contained in:
parent
1cd78a996f
commit
5f7f809084
@ -8,7 +8,6 @@ import net.corda.core.node.services.UnknownAnonymousPartyException
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.services.api.IdentityServiceInternal
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
|
||||
@ -120,7 +119,7 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
|
||||
principalToParties.addWithDuplicatesAllowed(it.name, key, false)
|
||||
}
|
||||
confidentialIdentities.forEach {
|
||||
principalToParties.addWithDuplicatesAllowed(it.name, mapToKey(it), false)
|
||||
keyToParties.addWithDuplicatesAllowed(mapToKey(it), it, false)
|
||||
}
|
||||
log.debug("Identities loaded")
|
||||
}
|
||||
@ -138,17 +137,18 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
|
||||
}
|
||||
|
||||
override fun registerIdentity(identity: PartyAndCertificate, isNewRandomIdentity: Boolean): PartyAndCertificate? {
|
||||
log.debug("Registering identity $identity")
|
||||
val identityCertChain = identity.certPath.x509Certificates
|
||||
log.debug { "Registering identity $identity" }
|
||||
val key = mapToKey(identity)
|
||||
|
||||
if (isNewRandomIdentity) {
|
||||
// Because this is supposed to be new and random, there's no way we have it in the database already, so skip the pessimistic check.
|
||||
keyToParties[key] = identity
|
||||
} else {
|
||||
keyToParties.addWithDuplicatesAllowed(key, identity)
|
||||
principalToParties.addWithDuplicatesAllowed(identity.name, key, false)
|
||||
}
|
||||
// Always keep the first party we registered, as that's the well known identity
|
||||
principalToParties.addWithDuplicatesAllowed(identity.name, key, false)
|
||||
|
||||
val parentId = mapToKey(identityCertChain[1].publicKey)
|
||||
return keyToParties[parentId]
|
||||
}
|
||||
|
@ -58,28 +58,31 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
|
||||
private fun set(key: K, value: V, logWarning: Boolean, store: (K, V) -> V?): Boolean {
|
||||
// Will be set to true if store says it isn't in the database.
|
||||
var isUnique = false
|
||||
cache.asMap().compute(key) { _, oldValue ->
|
||||
cache.asMap().compute(key) { _, oldValueInCache ->
|
||||
// Always write to the database, unless we can see it's already committed.
|
||||
when (oldValue) {
|
||||
when (oldValueInCache) {
|
||||
is Transactional.InFlight<*, V> -> {
|
||||
// Someone else is writing, so store away!
|
||||
isUnique = (store(key, value) == null)
|
||||
oldValue.apply { alsoWrite(value) }
|
||||
val oldValueInDB = store(key, value)
|
||||
isUnique = (oldValueInDB == null)
|
||||
oldValueInCache.apply { alsoWrite(value) }
|
||||
}
|
||||
is Transactional.Committed<V> -> oldValue // The value is already globally visible and cached. So do nothing since the values are always the same.
|
||||
is Transactional.Committed<V> -> oldValueInCache // The value is already globally visible and cached. So do nothing since the values are always the same.
|
||||
is Transactional.Unknown<*, V> -> {
|
||||
if (oldValue.isResolved && oldValue.isPresent) {
|
||||
Transactional.Committed(oldValue.value)
|
||||
if (oldValueInCache.isResolved && oldValueInCache.isPresent) {
|
||||
Transactional.Committed(oldValueInCache.value)
|
||||
} else {
|
||||
// Unknown. Store away!
|
||||
isUnique = (store(key, value) == null)
|
||||
transactionalForStoreResult(isUnique, key, value)
|
||||
val oldValueInDB = store(key, value)
|
||||
isUnique = (oldValueInDB == null)
|
||||
transactionalForStoreResult(key, value, oldValueInDB)
|
||||
}
|
||||
}
|
||||
else -> {
|
||||
// Missing or null. Store away!
|
||||
isUnique = (store(key, value) == null)
|
||||
transactionalForStoreResult(isUnique, key, value)
|
||||
val oldValueInDB = store(key, value)
|
||||
isUnique = (oldValueInDB == null)
|
||||
transactionalForStoreResult(key, value, oldValueInDB)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -89,10 +92,10 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
|
||||
return isUnique
|
||||
}
|
||||
|
||||
private fun transactionalForStoreResult(isUnique: Boolean, key: K, value: V): Transactional<V> {
|
||||
return if (!isUnique && !weAreWriting(key)) {
|
||||
private fun transactionalForStoreResult(key: K, value: V, oldValue: V?): Transactional<V> {
|
||||
return if ( (oldValue != null) && !weAreWriting(key)) {
|
||||
// If we found a value already in the database, and we were not already writing, then it's already committed but got evicted.
|
||||
Transactional.Committed(value)
|
||||
Transactional.Committed(oldValue)
|
||||
} else {
|
||||
// Some database transactions, including us, writing, with readers seeing whatever is in the database and writers seeing the (in memory) value.
|
||||
Transactional.InFlight(this, key, _readerValueLoader = { loadValue(key) }).apply { alsoWrite(value) }
|
||||
|
@ -0,0 +1,79 @@
|
||||
package net.corda.node.services.persistence
|
||||
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
import net.corda.testing.internal.configureDatabase
|
||||
import net.corda.testing.node.MockServices
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Test
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Id
|
||||
|
||||
class AppendOnlyPersistentMapNonConcurrentTest {
|
||||
|
||||
private val database = configureDatabase(MockServices.makeTestDataSourceProperties(),
|
||||
DatabaseConfig(),
|
||||
{ null }, { null },
|
||||
NodeSchemaService(setOf(MappedSchema(AppendOnlyPersistentMapTest::class.java, 1, listOf(AppendOnlyPersistentMapNonConcurrentTest.PersistentMapEntry::class.java)))))
|
||||
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "persist_map_test")
|
||||
class PersistentMapEntry(
|
||||
@Id
|
||||
@Column(name = "key")
|
||||
var key: Long = -1,
|
||||
|
||||
@Column(name = "value", length = 16)
|
||||
var value: String = ""
|
||||
)
|
||||
|
||||
class TestMap(cacheSize: Long) : AppendOnlyPersistentMap<Long, String, PersistentMapEntry, Long>(
|
||||
cacheFactory = TestingNamedCacheFactory(cacheSize),
|
||||
name = "ApoendOnlyPersistentMap_test",
|
||||
toPersistentEntityKey = { it },
|
||||
fromPersistentEntity = { Pair(it.key, it.value) },
|
||||
toPersistentEntity = { key: Long, value: String ->
|
||||
PersistentMapEntry().apply {
|
||||
this.key = key
|
||||
this.value = value
|
||||
}
|
||||
},
|
||||
persistentEntityClass = PersistentMapEntry::class.java
|
||||
)
|
||||
|
||||
private fun createMap(cacheSize: Long) = TestMap(cacheSize)
|
||||
|
||||
@After
|
||||
fun closeDatabase() {
|
||||
database.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `map prevents duplicates, when key has been evicted from cache, but present in database`() {
|
||||
val map = database.transaction {
|
||||
createMap(1)
|
||||
}
|
||||
|
||||
|
||||
database.transaction {
|
||||
map.addWithDuplicatesAllowed(1, "1")
|
||||
map.addWithDuplicatesAllowed(3, "3")
|
||||
}
|
||||
|
||||
database.transaction {
|
||||
map.addWithDuplicatesAllowed(1, "2")
|
||||
}
|
||||
|
||||
val result = database.transaction {
|
||||
map[1]
|
||||
}
|
||||
|
||||
assertThat(result).isEqualTo("1")
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user