ENT-2924 Fix performance of identity checking. (#4515)

* Concentrate accesses to the identity cache.

* Fix tests to reflect change in semantics / method name.

* Delete unused method and tests.
This commit is contained in:
Rick Parker 2019-01-08 14:35:32 +00:00 committed by GitHub
parent 93f12f9b20
commit ce250cfafd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 33 additions and 82 deletions

View File

@ -188,12 +188,10 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
lateinit var ourNames: Set<CordaX500Name>
// Allows us to cheaply eliminate keys we know belong to others by using the cache contents without triggering loading.
fun stripCachedPeerKeys(keys: Iterable<PublicKey>): Iterable<PublicKey> {
return keys.filter {
val party = keyToParties.getIfCached(mapToKey(it))?.party?.name
party == null || party in ourNames
}
// Allows us to eliminate keys we know belong to others by using the cache contents that might have been seen during other identity activity.
// Concentrating activity on the identity cache works better than spreading checking across identity and key management, because we cache misses too.
fun stripNotOurKeys(keys: Iterable<PublicKey>): Iterable<PublicKey> {
return keys.filter { certificateFromKey(it)?.name in ourNames }
}
}

View File

@ -1,13 +1,13 @@
package net.corda.node.services.keys
import net.corda.core.crypto.*
import net.corda.core.crypto.internal.AliasPrivateKey
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.core.crypto.internal.AliasPrivateKey
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.cryptoservice.CryptoService
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -103,7 +103,7 @@ class BasicHSMKeyManagementService(cacheFactory: NamedCacheFactory, val identity
}
override fun filterMyKeys(candidateKeys: Iterable<PublicKey>): Iterable<PublicKey> = database.transaction {
identityService.stripCachedPeerKeys(candidateKeys).filter { containsPublicKey(it) } // TODO: bulk cache access.
identityService.stripNotOurKeys(candidateKeys)
}
// Unlike initial keys, freshkey() is related confidential keys and it utilises platform's software key generation

View File

@ -71,7 +71,7 @@ class PersistentKeyManagementService(cacheFactory: NamedCacheFactory, val identi
override val keys: Set<PublicKey> get() = database.transaction { keysMap.allPersisted().map { it.first }.toSet() }
override fun filterMyKeys(candidateKeys: Iterable<PublicKey>): Iterable<PublicKey> = database.transaction {
identityService.stripCachedPeerKeys(candidateKeys).filter { keysMap[it] != null } // TODO: bulk cache access.
identityService.stripNotOurKeys(candidateKeys)
}
override fun freshKey(): PublicKey {

View File

@ -8,8 +8,7 @@ import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.nodeapi.internal.persistence.currentDBSession
import java.lang.ref.WeakReference
import java.util.HashSet
import java.util.NoSuchElementException
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
@ -137,19 +136,6 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
operator fun contains(key: K) = get(key) != null
/**
* Allow checking the cache content without falling back to database if there's a miss.
*
* @param key The cache key
* @return The value in the cache, or null if not present.
*/
fun getIfCached(key: K): V? {
val transactional = cache.getIfPresent(key!!)
return if (transactional?.isPresent ?: false) {
transactional?.value
} else null
}
/**
* Removes all of the mappings from this map and underlying storage. The map will be empty after this call returns.
* WARNING!! The method is not thread safe.

View File

@ -7,7 +7,6 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.node.services.UnknownAnonymousPartyException
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.nodeapi.internal.crypto.CertificateType
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.crypto.x509Certificates
@ -16,6 +15,7 @@ import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.*
import net.corda.testing.internal.DEV_INTERMEDIATE_CA
import net.corda.testing.internal.DEV_ROOT_CA
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.makeTestIdentityService
@ -96,21 +96,21 @@ class PersistentIdentityServiceTests {
}
@Test
fun `stripping others when none registered does not strip`() {
assertEquals(identityService.stripCachedPeerKeys(listOf(BOB_PUBKEY)).first(), BOB_PUBKEY)
fun `stripping others when none registered strips`() {
assertEquals(identityService.stripNotOurKeys(listOf(BOB_PUBKEY)).firstOrNull(), null)
}
@Test
fun `stripping others when only us registered does not strip`() {
fun `stripping others when only us registered strips`() {
identityService.verifyAndRegisterIdentity(ALICE_IDENTITY)
assertEquals(identityService.stripCachedPeerKeys(listOf(BOB_PUBKEY)).first(), BOB_PUBKEY)
assertEquals(identityService.stripNotOurKeys(listOf(BOB_PUBKEY)).firstOrNull(), null)
}
@Test
fun `stripping others when us and others registered does not strip us`() {
identityService.verifyAndRegisterIdentity(ALICE_IDENTITY)
identityService.verifyAndRegisterIdentity(BOB_IDENTITY)
val stripped = identityService.stripCachedPeerKeys(listOf(ALICE_PUBKEY, BOB_PUBKEY))
val stripped = identityService.stripNotOurKeys(listOf(ALICE_PUBKEY, BOB_PUBKEY))
assertEquals(stripped.single(), ALICE_PUBKEY)
}

View File

@ -9,9 +9,7 @@ import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.junit.After
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull
import org.junit.Assert.assertTrue
import org.junit.Assert.*
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@ -25,20 +23,20 @@ import javax.persistence.PersistenceException
class AppendOnlyPersistentMapTest(var scenario: Scenario) {
companion object {
private val scenarios = arrayOf<Scenario>(
Scenario(false, ReadOrWrite.Read, ReadOrWrite.Read, Outcome.Fail, Outcome.Fail, isCached = false),
Scenario(false, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.Success, Outcome.Fail, Outcome.Success, null),
Scenario(false, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Fail, Outcome.Success, isCached = false),
Scenario(false, ReadOrWrite.Write, ReadOrWrite.Write, Outcome.Success, Outcome.SuccessButErrorOnCommit, isCached = null),
Scenario(false, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.Read, Outcome.Success, Outcome.Fail, Outcome.Success, null),
Scenario(false, ReadOrWrite.Read, ReadOrWrite.WriteDuplicateAllowed, Outcome.Fail, Outcome.Success, isCached = true),
Scenario(false, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.WriteDuplicateAllowed, Outcome.Success, Outcome.SuccessButErrorOnCommit, Outcome.Fail, null),
Scenario(true, ReadOrWrite.Read, ReadOrWrite.Read, Outcome.Success, Outcome.Success, isCached = false),
Scenario(true, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.SuccessButErrorOnCommit, Outcome.Success, isCached = null),
Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.Fail, isCached = true),
Scenario(true, ReadOrWrite.Write, ReadOrWrite.Write, Outcome.SuccessButErrorOnCommit, Outcome.SuccessButErrorOnCommit, isCached = null),
Scenario(true, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.Read, Outcome.Fail, Outcome.Success, isCached = null),
Scenario(true, ReadOrWrite.Read, ReadOrWrite.WriteDuplicateAllowed, Outcome.Success, Outcome.Fail, isCached = true),
Scenario(true, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.WriteDuplicateAllowed, Outcome.Fail, Outcome.Fail, isCached = null)
Scenario(false, ReadOrWrite.Read, ReadOrWrite.Read, Outcome.Fail, Outcome.Fail),
Scenario(false, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.Success, Outcome.Fail, Outcome.Success),
Scenario(false, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Fail, Outcome.Success),
Scenario(false, ReadOrWrite.Write, ReadOrWrite.Write, Outcome.Success, Outcome.SuccessButErrorOnCommit),
Scenario(false, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.Read, Outcome.Success, Outcome.Fail, Outcome.Success),
Scenario(false, ReadOrWrite.Read, ReadOrWrite.WriteDuplicateAllowed, Outcome.Fail, Outcome.Success),
Scenario(false, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.WriteDuplicateAllowed, Outcome.Success, Outcome.SuccessButErrorOnCommit, Outcome.Fail),
Scenario(true, ReadOrWrite.Read, ReadOrWrite.Read, Outcome.Success, Outcome.Success),
Scenario(true, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.SuccessButErrorOnCommit, Outcome.Success),
Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.Fail),
Scenario(true, ReadOrWrite.Write, ReadOrWrite.Write, Outcome.SuccessButErrorOnCommit, Outcome.SuccessButErrorOnCommit),
Scenario(true, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.Read, Outcome.Fail, Outcome.Success),
Scenario(true, ReadOrWrite.Read, ReadOrWrite.WriteDuplicateAllowed, Outcome.Success, Outcome.Fail),
Scenario(true, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.WriteDuplicateAllowed, Outcome.Fail, Outcome.Fail)
)
@Parameterized.Parameters(name = "{0}")
@ -54,8 +52,7 @@ class AppendOnlyPersistentMapTest(var scenario: Scenario) {
val b: ReadOrWrite,
val aExpected: Outcome,
val bExpected: Outcome,
val bExpectedIfSingleThreaded: Outcome = bExpected,
val isCached: Boolean?)
val bExpectedIfSingleThreaded: Outcome = bExpected)
private val database = configureDatabase(makeTestDataSourceProperties(),
DatabaseConfig(),
@ -67,36 +64,6 @@ class AppendOnlyPersistentMapTest(var scenario: Scenario) {
database.close()
}
@Test
fun `getIfCached behaves as expected`() {
if (scenario.isCached != null) {
prepopulateIfRequired()
val map = createMap()
when (scenario.b) {
ReadOrWrite.Read -> { /* Do nothing */
}
ReadOrWrite.Write -> {
// Cause a read-thru
database.transaction { map.get(1) }
}
ReadOrWrite.WriteDuplicateAllowed -> {
// Write a value that overwrites anything pre-populated potentially.
database.transaction { map.addWithDuplicatesAllowed(1, "Y") }
}
}
val cachedValue = map.getIfCached(1)
val expectedValue = if (scenario.isCached!!) {
when (scenario.b) {
ReadOrWrite.Read -> throw IllegalStateException("Do nothing and isCached = true is not a valid combination.")
ReadOrWrite.Write -> "X"
ReadOrWrite.WriteDuplicateAllowed -> "Y"
}
} else null
assertEquals(expectedValue, cachedValue)
}
}
@Test
fun `concurrent test no purge between A and B`() {
prepopulateIfRequired()
@ -156,7 +123,7 @@ class AppendOnlyPersistentMapTest(var scenario: Scenario) {
@Test
fun `concurrent test purge between A and B`() {
// Writes intentionally do not check the database first, so purging between read and write changes behaviour
val remapped = mapOf(Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.Fail, isCached = true) to Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.SuccessButErrorOnCommit, isCached = true))
val remapped = mapOf(Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.Fail) to Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.SuccessButErrorOnCommit))
scenario = remapped[scenario] ?: scenario
prepopulateIfRequired()
val map = createMap()
@ -191,8 +158,8 @@ class AppendOnlyPersistentMapTest(var scenario: Scenario) {
fun `test purge mid-way in a single transaction`() {
// Writes intentionally do not check the database first, so purging between read and write changes behaviour
// Also, a purge after write causes the subsequent read to flush to the database, causing the read to generate a constraint violation when single threaded (in same database transaction).
val remapped = mapOf(Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.Fail, isCached = true) to Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.SuccessButErrorOnCommit, Outcome.SuccessButErrorOnCommit, isCached = true),
Scenario(true, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.SuccessButErrorOnCommit, Outcome.Success, isCached = null) to Scenario(true, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.SuccessButErrorOnCommit, Outcome.SuccessButErrorOnCommit, isCached = null))
val remapped = mapOf(Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.Fail) to Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.SuccessButErrorOnCommit, Outcome.SuccessButErrorOnCommit),
Scenario(true, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.SuccessButErrorOnCommit, Outcome.Success) to Scenario(true, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.SuccessButErrorOnCommit, Outcome.SuccessButErrorOnCommit))
scenario = remapped[scenario] ?: scenario
prepopulateIfRequired()
val map = createMap()