AppendOnlyPersistentMapBase.allPersisted no longer loads everything into memory at once (#5286)

As a general purpose API, allPersisted should not be loading the entire contents of the database table into memory. Instead now it returns a Stream for processing of elements.
This commit is contained in:
Shams Asari 2019-07-10 18:09:02 +01:00 committed by GitHub
parent 76631132ca
commit 49cda57c81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 66 additions and 48 deletions

View File

@ -45,12 +45,15 @@ import java.util.*
import java.util.Spliterator.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors
import java.util.stream.Collectors.toCollection
import java.util.stream.IntStream
import java.util.stream.Stream
import java.util.stream.StreamSupport
import java.util.zip.Deflater
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
import kotlin.collections.LinkedHashSet
import kotlin.reflect.KClass
import kotlin.reflect.full.createInstance
@ -273,6 +276,9 @@ inline fun <T, R : Any> Stream<T>.mapNotNull(crossinline transform: (T) -> R?):
}
}
/** Similar to [Collectors.toSet] except the Set is guaranteed to be ordered. */
fun <T> Stream<T>.toSet(): Set<T> = collect(toCollection { LinkedHashSet<T>() })
fun <T> Class<T>.castIfPossible(obj: Any): T? = if (isInstance(obj)) cast(obj) else null
/** Returns a [DeclaredField] wrapper around the declared (possibly non-public) static field of the receiver [Class]. */

View File

@ -115,13 +115,13 @@ class DBNetworkParametersStorage(
val currentParameters = lookup(currentHash)
?: throw IllegalStateException("Unable to obtain NotaryInfo current network parameters not set.")
val inCurrentParams = currentParameters.notaries.singleOrNull { it.identity == party }
if (inCurrentParams == null) {
val inOldParams = hashToParameters.allPersisted().flatMap { (_, signedParams) ->
val parameters = signedParams.raw.deserialize()
parameters.notaries.asSequence()
}.firstOrNull { it.identity == party }
return inOldParams
} else return inCurrentParams
if (inCurrentParams != null) return inCurrentParams
return hashToParameters.allPersisted.use {
it.flatMap { (_, signedNetParams) -> signedNetParams.raw.deserialize().notaries.stream() }
.filter { it.identity == party }
.findFirst()
.orElse(null)
}
}
@Entity

View File

@ -26,6 +26,7 @@ import sun.reflect.generics.reflectiveObjects.NotImplementedException
import java.nio.file.Paths
import java.time.Clock
import java.time.Duration
import java.util.Comparator.comparingInt
class MigrationServicesForResolution(
override val identityService: IdentityService,
@ -87,8 +88,10 @@ class MigrationServicesForResolution(
private val filedParams = getNetworkParametersFromFile()
override val defaultHash: SecureHash = filedParams?.raw?.hash ?: SecureHash.getZeroHash()
override val currentHash: SecureHash = cordaDB.transaction {
storage.allPersisted().maxBy { it.second.verified().epoch }?.first ?: defaultHash
override val currentHash: SecureHash = cordaDB.transaction {
storage.allPersisted.use {
it.max(comparingInt { it.second.verified().epoch }).map { it.first }.orElse(defaultHash)
}
}
override fun lookup(hash: SecureHash): NetworkParameters? {

View File

@ -4,12 +4,12 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.identity.*
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.hash
import net.corda.core.internal.toSet
import net.corda.core.node.services.UnknownAnonymousPartyException
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.toBase58String
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
@ -25,6 +25,7 @@ import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Lob
import kotlin.streams.toList
/**
* An identity service that stores parties and their identities to a key value tables in the database. The entries are
@ -171,8 +172,10 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
}
// We give the caller a copy of the data set to avoid any locking problems
override fun getAllIdentities(): Iterable<PartyAndCertificate> = database.transaction {
keyToParties.allPersisted().map { it.second }.asIterable()
override fun getAllIdentities(): Iterable<PartyAndCertificate> {
return database.transaction {
keyToParties.allPersisted.use { it.map { it.second }.toList() }
}
}
override fun wellKnownPartyFromX500Name(name: CordaX500Name): Party? = certificateFromCordaX500Name(name)?.party
@ -190,13 +193,9 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
override fun partiesFromName(query: String, exactMatch: Boolean): Set<Party> {
return database.transaction {
val results = LinkedHashSet<Party>()
principalToParties.allPersisted().forEach { (x500name, partyId) ->
if (x500Matches(query, exactMatch, x500name)) {
results += keyToParties[partyId]!!.party
}
principalToParties.allPersisted.use {
it.filter { x500Matches(query, exactMatch, it.first) }.map { keyToParties[it.second]!!.party }.toSet()
}
results
}
}

View File

@ -19,6 +19,7 @@ import java.security.PrivateKey
import java.security.PublicKey
import java.util.*
import javax.persistence.*
import kotlin.collections.LinkedHashSet
/**
* A persistent re-implementation of [E2ETestKeyManagementService] to support CryptoService for initial keys and
@ -76,7 +77,13 @@ class BasicHSMKeyManagementService(cacheFactory: NamedCacheFactory, val identity
}
}
override val keys: Set<PublicKey> get() = database.transaction { originalKeysMap.keys.plus(keysMap.allPersisted().map { it.first }.toSet()) }
override val keys: Set<PublicKey> get() {
return database.transaction {
val set = LinkedHashSet<PublicKey>(originalKeysMap.keys)
keysMap.allPersisted.use { it.forEach { set += it.first } }
set
}
}
private fun containsPublicKey(publicKey: PublicKey): Boolean {
return (publicKey in originalKeysMap || publicKey in keysMap)

View File

@ -3,6 +3,7 @@ package net.corda.node.services.keys
import net.corda.core.crypto.*
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.toSet
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.node.services.identity.PersistentIdentityService
@ -69,7 +70,7 @@ class PersistentKeyManagementService(cacheFactory: NamedCacheFactory, val identi
initialKeyPairs.forEach { keysMap.addWithDuplicatesAllowed(it.public, it.private) }
}
override val keys: Set<PublicKey> get() = database.transaction { keysMap.allPersisted().map { it.first }.toSet() }
override val keys: Set<PublicKey> get() = database.transaction { keysMap.allPersisted.use { it.map { it.first }.toSet() } }
override fun filterMyKeys(candidateKeys: Iterable<PublicKey>): Iterable<PublicKey> = database.transaction {
identityService.stripNotOurKeys(candidateKeys)

View File

@ -26,6 +26,7 @@ import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY
import rx.Observable
import rx.subjects.PublishSubject
import javax.persistence.*
import kotlin.streams.toList
// cache value type to just store the immutable bits of a signed transaction plus conversion helpers
typealias TxCacheValue = Pair<SerializedBytes<CoreTransaction>, List<TransactionSignature>>
@ -88,10 +89,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
private const val transactionSignatureOverheadEstimate = 1024
private fun weighTx(tx: AppendOnlyPersistentMapBase.Transactional<TxCacheValue>): Int {
val actTx = tx.peekableValue
if (actTx == null) {
return 0
}
val actTx = tx.peekableValue ?: return 0
return actTx.second.sumBy { it.size + transactionSignatureOverheadEstimate } + actTx.first.size
}
}
@ -114,7 +112,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {
return database.transaction {
txStorage.locked {
DataFeed(allPersisted().map { it.second.toSignedTx() }.toList(), updates.bufferUntilSubscribed())
DataFeed(snapshot(), updates.bufferUntilSubscribed())
}
}
}
@ -133,6 +131,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
}
@VisibleForTesting
val transactions: Iterable<SignedTransaction>
get() = database.transaction { txStorage.content.allPersisted().map { it.second.toSignedTx() }.toList() }
val transactions: List<SignedTransaction> get() = database.transaction { snapshot() }
private fun snapshot() = txStorage.content.allPersisted.use { it.map { it.second.toSignedTx() }.toList() }
}

View File

@ -12,6 +12,7 @@ import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import java.util.stream.Stream
/**
* Implements a caching layer on top of an *append-only* table accessed via Hibernate mapping. Note that if the same key is [set] twice,
@ -36,24 +37,24 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
/**
* Returns the value associated with the key, first loading that value from the storage if necessary.
*/
operator fun get(key: K): V? {
return cache.get(key)!!.orElse(null)
}
operator fun get(key: K): V? = cache.get(key)?.orElse(null)
val size get() = allPersisted().toList().size
val size: Long get() = allPersisted.use { it.count() }
/**
* Returns all key/value pairs from the underlying storage.
* Returns all key/value pairs from the underlying storage in a [Stream].
*
* Make sure to close the [Stream] once it's been processed.
*/
fun allPersisted(): Sequence<Pair<K, V>> {
val session = currentDBSession()
val criteriaQuery = session.criteriaBuilder.createQuery(persistentEntityClass)
val root = criteriaQuery.from(persistentEntityClass)
criteriaQuery.select(root)
val query = session.createQuery(criteriaQuery)
val result = query.resultList
return result.map { x -> fromPersistentEntity(x) }.asSequence()
}
val allPersisted: Stream<Pair<K, V>>
get() {
val session = currentDBSession()
val criteriaQuery = session.criteriaBuilder.createQuery(persistentEntityClass)
val root = criteriaQuery.from(persistentEntityClass)
criteriaQuery.select(root)
val query = session.createQuery(criteriaQuery)
return query.stream().map(fromPersistentEntity)
}
private fun set(key: K, value: V, logWarning: Boolean, store: (K, V) -> V?): Boolean {
// Will be set to true if store says it isn't in the database.
@ -157,7 +158,7 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
Transactional.InFlight(this, key, { loadValue(key) }, { loadValue(key)!! })
} else {
// If no one is writing, then the value may or may not exist in the database.
Transactional.Unknown(this, key, { loadValue(key) })
Transactional.Unknown(this, key) { loadValue(key) }
}
}

View File

@ -335,7 +335,7 @@ object BFTSmart {
// LinkedHashMap for deterministic serialisation
val committedStates = LinkedHashMap<StateRef, SecureHash>()
val requests = services.database.transaction {
commitLog.allPersisted().forEach { committedStates[it.first] = it.second }
commitLog.allPersisted.use { it.forEach { committedStates[it.first] = it.second } }
val criteriaQuery = session.criteriaBuilder.createQuery(PersistentUniquenessProvider.Request::class.java)
criteriaQuery.select(criteriaQuery.from(PersistentUniquenessProvider.Request::class.java))
session.createQuery(criteriaQuery).resultList

View File

@ -175,11 +175,13 @@ class RaftTransactionCommitLog<E, EK>(
*/
override fun snapshot(writer: SnapshotWriter) {
db.transaction {
writer.writeInt(map.size)
map.allPersisted().forEach {
val bytes = it.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
writer.writeUnsignedShort(bytes.size)
writer.writeObject(bytes)
writer.writeInt(map.size.toInt())
map.allPersisted.use {
it.forEach {
val bytes = it.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
writer.writeUnsignedShort(bytes.size)
writer.writeObject(bytes)
}
}
val criteriaQuery = session.criteriaBuilder.createQuery(PersistentUniquenessProvider.Request::class.java)