diff --git a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt index 32cd247bb3..712d96f5ca 100644 --- a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt +++ b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt @@ -188,12 +188,10 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri lateinit var ourNames: Set - // Allows us to cheaply eliminate keys we know belong to others by using the cache contents without triggering loading. - fun stripCachedPeerKeys(keys: Iterable): Iterable { - return keys.filter { - val party = keyToParties.getIfCached(mapToKey(it))?.party?.name - party == null || party in ourNames - } + // Allows us to eliminate keys we know belong to others by using the cache contents that might have been seen during other identity activity. + // Concentrating activity on the identity cache works better than spreading checking across identity and key management, because we cache misses too. + fun stripNotOurKeys(keys: Iterable): Iterable { + return keys.filter { certificateFromKey(it)?.name in ourNames } } } diff --git a/node/src/main/kotlin/net/corda/node/services/keys/BasicHSMKeyManagementService.kt b/node/src/main/kotlin/net/corda/node/services/keys/BasicHSMKeyManagementService.kt index ba5329daa6..bda7ca928f 100644 --- a/node/src/main/kotlin/net/corda/node/services/keys/BasicHSMKeyManagementService.kt +++ b/node/src/main/kotlin/net/corda/node/services/keys/BasicHSMKeyManagementService.kt @@ -1,13 +1,13 @@ package net.corda.node.services.keys import net.corda.core.crypto.* +import net.corda.core.crypto.internal.AliasPrivateKey import net.corda.core.identity.PartyAndCertificate import net.corda.core.internal.NamedCacheFactory import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize import net.corda.core.utilities.MAX_HASH_HEX_SIZE import net.corda.node.services.identity.PersistentIdentityService -import net.corda.core.crypto.internal.AliasPrivateKey import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.cryptoservice.CryptoService import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -103,7 +103,7 @@ class BasicHSMKeyManagementService(cacheFactory: NamedCacheFactory, val identity } override fun filterMyKeys(candidateKeys: Iterable): Iterable = database.transaction { - identityService.stripCachedPeerKeys(candidateKeys).filter { containsPublicKey(it) } // TODO: bulk cache access. + identityService.stripNotOurKeys(candidateKeys) } // Unlike initial keys, freshkey() is related confidential keys and it utilises platform's software key generation diff --git a/node/src/main/kotlin/net/corda/node/services/keys/PersistentKeyManagementService.kt b/node/src/main/kotlin/net/corda/node/services/keys/PersistentKeyManagementService.kt index b3a3a7892f..74a022fadb 100644 --- a/node/src/main/kotlin/net/corda/node/services/keys/PersistentKeyManagementService.kt +++ b/node/src/main/kotlin/net/corda/node/services/keys/PersistentKeyManagementService.kt @@ -71,7 +71,7 @@ class PersistentKeyManagementService(cacheFactory: NamedCacheFactory, val identi override val keys: Set get() = database.transaction { keysMap.allPersisted().map { it.first }.toSet() } override fun filterMyKeys(candidateKeys: Iterable): Iterable = database.transaction { - identityService.stripCachedPeerKeys(candidateKeys).filter { keysMap[it] != null } // TODO: bulk cache access. + identityService.stripNotOurKeys(candidateKeys) } override fun freshKey(): PublicKey { diff --git a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt index 84c344025c..0f15fae4b8 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt @@ -8,8 +8,7 @@ import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.nodeapi.internal.persistence.contextTransaction import net.corda.nodeapi.internal.persistence.currentDBSession import java.lang.ref.WeakReference -import java.util.HashSet -import java.util.NoSuchElementException +import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference @@ -137,19 +136,6 @@ abstract class AppendOnlyPersistentMapBase( operator fun contains(key: K) = get(key) != null - /** - * Allow checking the cache content without falling back to database if there's a miss. - * - * @param key The cache key - * @return The value in the cache, or null if not present. - */ - fun getIfCached(key: K): V? { - val transactional = cache.getIfPresent(key!!) - return if (transactional?.isPresent ?: false) { - transactional?.value - } else null - } - /** * Removes all of the mappings from this map and underlying storage. The map will be empty after this call returns. * WARNING!! The method is not thread safe. diff --git a/node/src/test/kotlin/net/corda/node/services/identity/PersistentIdentityServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/identity/PersistentIdentityServiceTests.kt index bf8a16880f..26886a011d 100644 --- a/node/src/test/kotlin/net/corda/node/services/identity/PersistentIdentityServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/identity/PersistentIdentityServiceTests.kt @@ -7,7 +7,6 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate import net.corda.core.node.services.UnknownAnonymousPartyException -import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.nodeapi.internal.crypto.CertificateType import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.crypto.x509Certificates @@ -16,6 +15,7 @@ import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.testing.core.* import net.corda.testing.internal.DEV_INTERMEDIATE_CA import net.corda.testing.internal.DEV_ROOT_CA +import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.configureDatabase import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.node.makeTestIdentityService @@ -96,21 +96,21 @@ class PersistentIdentityServiceTests { } @Test - fun `stripping others when none registered does not strip`() { - assertEquals(identityService.stripCachedPeerKeys(listOf(BOB_PUBKEY)).first(), BOB_PUBKEY) + fun `stripping others when none registered strips`() { + assertEquals(identityService.stripNotOurKeys(listOf(BOB_PUBKEY)).firstOrNull(), null) } @Test - fun `stripping others when only us registered does not strip`() { + fun `stripping others when only us registered strips`() { identityService.verifyAndRegisterIdentity(ALICE_IDENTITY) - assertEquals(identityService.stripCachedPeerKeys(listOf(BOB_PUBKEY)).first(), BOB_PUBKEY) + assertEquals(identityService.stripNotOurKeys(listOf(BOB_PUBKEY)).firstOrNull(), null) } @Test fun `stripping others when us and others registered does not strip us`() { identityService.verifyAndRegisterIdentity(ALICE_IDENTITY) identityService.verifyAndRegisterIdentity(BOB_IDENTITY) - val stripped = identityService.stripCachedPeerKeys(listOf(ALICE_PUBKEY, BOB_PUBKEY)) + val stripped = identityService.stripNotOurKeys(listOf(ALICE_PUBKEY, BOB_PUBKEY)) assertEquals(stripped.single(), ALICE_PUBKEY) } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/AppendOnlyPersistentMapTest.kt b/node/src/test/kotlin/net/corda/node/services/persistence/AppendOnlyPersistentMapTest.kt index 05edf376ff..a054ae24a3 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/AppendOnlyPersistentMapTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/AppendOnlyPersistentMapTest.kt @@ -9,9 +9,7 @@ import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.configureDatabase import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import org.junit.After -import org.junit.Assert.assertEquals -import org.junit.Assert.assertNull -import org.junit.Assert.assertTrue +import org.junit.Assert.* import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -25,20 +23,20 @@ import javax.persistence.PersistenceException class AppendOnlyPersistentMapTest(var scenario: Scenario) { companion object { private val scenarios = arrayOf( - Scenario(false, ReadOrWrite.Read, ReadOrWrite.Read, Outcome.Fail, Outcome.Fail, isCached = false), - Scenario(false, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.Success, Outcome.Fail, Outcome.Success, null), - Scenario(false, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Fail, Outcome.Success, isCached = false), - Scenario(false, ReadOrWrite.Write, ReadOrWrite.Write, Outcome.Success, Outcome.SuccessButErrorOnCommit, isCached = null), - Scenario(false, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.Read, Outcome.Success, Outcome.Fail, Outcome.Success, null), - Scenario(false, ReadOrWrite.Read, ReadOrWrite.WriteDuplicateAllowed, Outcome.Fail, Outcome.Success, isCached = true), - Scenario(false, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.WriteDuplicateAllowed, Outcome.Success, Outcome.SuccessButErrorOnCommit, Outcome.Fail, null), - Scenario(true, ReadOrWrite.Read, ReadOrWrite.Read, Outcome.Success, Outcome.Success, isCached = false), - Scenario(true, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.SuccessButErrorOnCommit, Outcome.Success, isCached = null), - Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.Fail, isCached = true), - Scenario(true, ReadOrWrite.Write, ReadOrWrite.Write, Outcome.SuccessButErrorOnCommit, Outcome.SuccessButErrorOnCommit, isCached = null), - Scenario(true, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.Read, Outcome.Fail, Outcome.Success, isCached = null), - Scenario(true, ReadOrWrite.Read, ReadOrWrite.WriteDuplicateAllowed, Outcome.Success, Outcome.Fail, isCached = true), - Scenario(true, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.WriteDuplicateAllowed, Outcome.Fail, Outcome.Fail, isCached = null) + Scenario(false, ReadOrWrite.Read, ReadOrWrite.Read, Outcome.Fail, Outcome.Fail), + Scenario(false, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.Success, Outcome.Fail, Outcome.Success), + Scenario(false, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Fail, Outcome.Success), + Scenario(false, ReadOrWrite.Write, ReadOrWrite.Write, Outcome.Success, Outcome.SuccessButErrorOnCommit), + Scenario(false, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.Read, Outcome.Success, Outcome.Fail, Outcome.Success), + Scenario(false, ReadOrWrite.Read, ReadOrWrite.WriteDuplicateAllowed, Outcome.Fail, Outcome.Success), + Scenario(false, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.WriteDuplicateAllowed, Outcome.Success, Outcome.SuccessButErrorOnCommit, Outcome.Fail), + Scenario(true, ReadOrWrite.Read, ReadOrWrite.Read, Outcome.Success, Outcome.Success), + Scenario(true, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.SuccessButErrorOnCommit, Outcome.Success), + Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.Fail), + Scenario(true, ReadOrWrite.Write, ReadOrWrite.Write, Outcome.SuccessButErrorOnCommit, Outcome.SuccessButErrorOnCommit), + Scenario(true, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.Read, Outcome.Fail, Outcome.Success), + Scenario(true, ReadOrWrite.Read, ReadOrWrite.WriteDuplicateAllowed, Outcome.Success, Outcome.Fail), + Scenario(true, ReadOrWrite.WriteDuplicateAllowed, ReadOrWrite.WriteDuplicateAllowed, Outcome.Fail, Outcome.Fail) ) @Parameterized.Parameters(name = "{0}") @@ -54,8 +52,7 @@ class AppendOnlyPersistentMapTest(var scenario: Scenario) { val b: ReadOrWrite, val aExpected: Outcome, val bExpected: Outcome, - val bExpectedIfSingleThreaded: Outcome = bExpected, - val isCached: Boolean?) + val bExpectedIfSingleThreaded: Outcome = bExpected) private val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), @@ -67,36 +64,6 @@ class AppendOnlyPersistentMapTest(var scenario: Scenario) { database.close() } - @Test - fun `getIfCached behaves as expected`() { - if (scenario.isCached != null) { - prepopulateIfRequired() - val map = createMap() - when (scenario.b) { - ReadOrWrite.Read -> { /* Do nothing */ - } - ReadOrWrite.Write -> { - // Cause a read-thru - database.transaction { map.get(1) } - } - ReadOrWrite.WriteDuplicateAllowed -> { - // Write a value that overwrites anything pre-populated potentially. - database.transaction { map.addWithDuplicatesAllowed(1, "Y") } - } - } - - val cachedValue = map.getIfCached(1) - val expectedValue = if (scenario.isCached!!) { - when (scenario.b) { - ReadOrWrite.Read -> throw IllegalStateException("Do nothing and isCached = true is not a valid combination.") - ReadOrWrite.Write -> "X" - ReadOrWrite.WriteDuplicateAllowed -> "Y" - } - } else null - assertEquals(expectedValue, cachedValue) - } - } - @Test fun `concurrent test no purge between A and B`() { prepopulateIfRequired() @@ -156,7 +123,7 @@ class AppendOnlyPersistentMapTest(var scenario: Scenario) { @Test fun `concurrent test purge between A and B`() { // Writes intentionally do not check the database first, so purging between read and write changes behaviour - val remapped = mapOf(Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.Fail, isCached = true) to Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.SuccessButErrorOnCommit, isCached = true)) + val remapped = mapOf(Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.Fail) to Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.SuccessButErrorOnCommit)) scenario = remapped[scenario] ?: scenario prepopulateIfRequired() val map = createMap() @@ -191,8 +158,8 @@ class AppendOnlyPersistentMapTest(var scenario: Scenario) { fun `test purge mid-way in a single transaction`() { // Writes intentionally do not check the database first, so purging between read and write changes behaviour // Also, a purge after write causes the subsequent read to flush to the database, causing the read to generate a constraint violation when single threaded (in same database transaction). - val remapped = mapOf(Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.Fail, isCached = true) to Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.SuccessButErrorOnCommit, Outcome.SuccessButErrorOnCommit, isCached = true), - Scenario(true, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.SuccessButErrorOnCommit, Outcome.Success, isCached = null) to Scenario(true, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.SuccessButErrorOnCommit, Outcome.SuccessButErrorOnCommit, isCached = null)) + val remapped = mapOf(Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.Success, Outcome.Fail) to Scenario(true, ReadOrWrite.Read, ReadOrWrite.Write, Outcome.SuccessButErrorOnCommit, Outcome.SuccessButErrorOnCommit), + Scenario(true, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.SuccessButErrorOnCommit, Outcome.Success) to Scenario(true, ReadOrWrite.Write, ReadOrWrite.Read, Outcome.SuccessButErrorOnCommit, Outcome.SuccessButErrorOnCommit)) scenario = remapped[scenario] ?: scenario prepopulateIfRequired() val map = createMap()