From 49cda57c81af0df3f2d34f0441b9e5bf981ac235 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Wed, 10 Jul 2019 18:09:02 +0100 Subject: [PATCH] AppendOnlyPersistentMapBase.allPersisted no longer loads everything into memory at once (#5286) As a general purpose API, allPersisted should not be loading the entire contents of the database table into memory. Instead now it returns a Stream for processing of elements. --- .../net/corda/core/internal/InternalUtils.kt | 6 ++++ .../internal/DBNetworkParametersStorage.kt | 14 ++++----- .../MigrationServicesForResolution.kt | 7 +++-- .../identity/PersistentIdentityService.kt | 17 +++++----- .../keys/BasicHSMKeyManagementService.kt | 9 +++++- .../keys/PersistentKeyManagementService.kt | 3 +- .../persistence/DBTransactionStorage.kt | 13 ++++---- .../node/utilities/AppendOnlyPersistentMap.kt | 31 ++++++++++--------- .../notary/experimental/bftsmart/BFTSmart.kt | 2 +- .../raft/RaftTransactionCommitLog.kt | 12 ++++--- 10 files changed, 66 insertions(+), 48 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt index d93509c873..d97e23a4dd 100644 --- a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt @@ -45,12 +45,15 @@ import java.util.* import java.util.Spliterator.* import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit +import java.util.stream.Collectors +import java.util.stream.Collectors.toCollection import java.util.stream.IntStream import java.util.stream.Stream import java.util.stream.StreamSupport import java.util.zip.Deflater import java.util.zip.ZipEntry import java.util.zip.ZipOutputStream +import kotlin.collections.LinkedHashSet import kotlin.reflect.KClass import kotlin.reflect.full.createInstance @@ -273,6 +276,9 @@ inline fun Stream.mapNotNull(crossinline transform: (T) -> R?): } } +/** Similar to [Collectors.toSet] except the Set is guaranteed to be ordered. */ +fun Stream.toSet(): Set = collect(toCollection { LinkedHashSet() }) + fun Class.castIfPossible(obj: Any): T? = if (isInstance(obj)) cast(obj) else null /** Returns a [DeclaredField] wrapper around the declared (possibly non-public) static field of the receiver [Class]. */ diff --git a/node/src/main/kotlin/net/corda/node/internal/DBNetworkParametersStorage.kt b/node/src/main/kotlin/net/corda/node/internal/DBNetworkParametersStorage.kt index 1d2b95d702..e777bf7dcc 100644 --- a/node/src/main/kotlin/net/corda/node/internal/DBNetworkParametersStorage.kt +++ b/node/src/main/kotlin/net/corda/node/internal/DBNetworkParametersStorage.kt @@ -115,13 +115,13 @@ class DBNetworkParametersStorage( val currentParameters = lookup(currentHash) ?: throw IllegalStateException("Unable to obtain NotaryInfo – current network parameters not set.") val inCurrentParams = currentParameters.notaries.singleOrNull { it.identity == party } - if (inCurrentParams == null) { - val inOldParams = hashToParameters.allPersisted().flatMap { (_, signedParams) -> - val parameters = signedParams.raw.deserialize() - parameters.notaries.asSequence() - }.firstOrNull { it.identity == party } - return inOldParams - } else return inCurrentParams + if (inCurrentParams != null) return inCurrentParams + return hashToParameters.allPersisted.use { + it.flatMap { (_, signedNetParams) -> signedNetParams.raw.deserialize().notaries.stream() } + .filter { it.identity == party } + .findFirst() + .orElse(null) + } } @Entity diff --git a/node/src/main/kotlin/net/corda/node/migration/MigrationServicesForResolution.kt b/node/src/main/kotlin/net/corda/node/migration/MigrationServicesForResolution.kt index 763bbb0e11..8f097c308b 100644 --- a/node/src/main/kotlin/net/corda/node/migration/MigrationServicesForResolution.kt +++ b/node/src/main/kotlin/net/corda/node/migration/MigrationServicesForResolution.kt @@ -26,6 +26,7 @@ import sun.reflect.generics.reflectiveObjects.NotImplementedException import java.nio.file.Paths import java.time.Clock import java.time.Duration +import java.util.Comparator.comparingInt class MigrationServicesForResolution( override val identityService: IdentityService, @@ -87,8 +88,10 @@ class MigrationServicesForResolution( private val filedParams = getNetworkParametersFromFile() override val defaultHash: SecureHash = filedParams?.raw?.hash ?: SecureHash.getZeroHash() - override val currentHash: SecureHash = cordaDB.transaction { - storage.allPersisted().maxBy { it.second.verified().epoch }?.first ?: defaultHash + override val currentHash: SecureHash = cordaDB.transaction { + storage.allPersisted.use { + it.max(comparingInt { it.second.verified().epoch }).map { it.first }.orElse(defaultHash) + } } override fun lookup(hash: SecureHash): NetworkParameters? { diff --git a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt index 56ebdf98a5..990868473b 100644 --- a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt +++ b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt @@ -4,12 +4,12 @@ import net.corda.core.crypto.SecureHash import net.corda.core.identity.* import net.corda.core.internal.NamedCacheFactory import net.corda.core.internal.hash +import net.corda.core.internal.toSet import net.corda.core.node.services.UnknownAnonymousPartyException import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.MAX_HASH_HEX_SIZE import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug -import net.corda.core.utilities.toBase58String import net.corda.node.services.api.IdentityServiceInternal import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.crypto.X509CertificateFactory @@ -25,6 +25,7 @@ import javax.persistence.Column import javax.persistence.Entity import javax.persistence.Id import javax.persistence.Lob +import kotlin.streams.toList /** * An identity service that stores parties and their identities to a key value tables in the database. The entries are @@ -171,8 +172,10 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri } // We give the caller a copy of the data set to avoid any locking problems - override fun getAllIdentities(): Iterable = database.transaction { - keyToParties.allPersisted().map { it.second }.asIterable() + override fun getAllIdentities(): Iterable { + return database.transaction { + keyToParties.allPersisted.use { it.map { it.second }.toList() } + } } override fun wellKnownPartyFromX500Name(name: CordaX500Name): Party? = certificateFromCordaX500Name(name)?.party @@ -190,13 +193,9 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri override fun partiesFromName(query: String, exactMatch: Boolean): Set { return database.transaction { - val results = LinkedHashSet() - principalToParties.allPersisted().forEach { (x500name, partyId) -> - if (x500Matches(query, exactMatch, x500name)) { - results += keyToParties[partyId]!!.party - } + principalToParties.allPersisted.use { + it.filter { x500Matches(query, exactMatch, it.first) }.map { keyToParties[it.second]!!.party }.toSet() } - results } } diff --git a/node/src/main/kotlin/net/corda/node/services/keys/BasicHSMKeyManagementService.kt b/node/src/main/kotlin/net/corda/node/services/keys/BasicHSMKeyManagementService.kt index f3dac99eaf..5f3014455d 100644 --- a/node/src/main/kotlin/net/corda/node/services/keys/BasicHSMKeyManagementService.kt +++ b/node/src/main/kotlin/net/corda/node/services/keys/BasicHSMKeyManagementService.kt @@ -19,6 +19,7 @@ import java.security.PrivateKey import java.security.PublicKey import java.util.* import javax.persistence.* +import kotlin.collections.LinkedHashSet /** * A persistent re-implementation of [E2ETestKeyManagementService] to support CryptoService for initial keys and @@ -76,7 +77,13 @@ class BasicHSMKeyManagementService(cacheFactory: NamedCacheFactory, val identity } } - override val keys: Set get() = database.transaction { originalKeysMap.keys.plus(keysMap.allPersisted().map { it.first }.toSet()) } + override val keys: Set get() { + return database.transaction { + val set = LinkedHashSet(originalKeysMap.keys) + keysMap.allPersisted.use { it.forEach { set += it.first } } + set + } + } private fun containsPublicKey(publicKey: PublicKey): Boolean { return (publicKey in originalKeysMap || publicKey in keysMap) diff --git a/node/src/main/kotlin/net/corda/node/services/keys/PersistentKeyManagementService.kt b/node/src/main/kotlin/net/corda/node/services/keys/PersistentKeyManagementService.kt index 878a0243b2..9095df45be 100644 --- a/node/src/main/kotlin/net/corda/node/services/keys/PersistentKeyManagementService.kt +++ b/node/src/main/kotlin/net/corda/node/services/keys/PersistentKeyManagementService.kt @@ -3,6 +3,7 @@ package net.corda.node.services.keys import net.corda.core.crypto.* import net.corda.core.identity.PartyAndCertificate import net.corda.core.internal.NamedCacheFactory +import net.corda.core.internal.toSet import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.MAX_HASH_HEX_SIZE import net.corda.node.services.identity.PersistentIdentityService @@ -69,7 +70,7 @@ class PersistentKeyManagementService(cacheFactory: NamedCacheFactory, val identi initialKeyPairs.forEach { keysMap.addWithDuplicatesAllowed(it.public, it.private) } } - override val keys: Set get() = database.transaction { keysMap.allPersisted().map { it.first }.toSet() } + override val keys: Set get() = database.transaction { keysMap.allPersisted.use { it.map { it.first }.toSet() } } override fun filterMyKeys(candidateKeys: Iterable): Iterable = database.transaction { identityService.stripNotOurKeys(candidateKeys) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index 0f0bacde12..4efc8ed72d 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -26,6 +26,7 @@ import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY import rx.Observable import rx.subjects.PublishSubject import javax.persistence.* +import kotlin.streams.toList // cache value type to just store the immutable bits of a signed transaction plus conversion helpers typealias TxCacheValue = Pair, List> @@ -88,10 +89,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: private const val transactionSignatureOverheadEstimate = 1024 private fun weighTx(tx: AppendOnlyPersistentMapBase.Transactional): Int { - val actTx = tx.peekableValue - if (actTx == null) { - return 0 - } + val actTx = tx.peekableValue ?: return 0 return actTx.second.sumBy { it.size + transactionSignatureOverheadEstimate } + actTx.first.size } } @@ -114,7 +112,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: override fun track(): DataFeed, SignedTransaction> { return database.transaction { txStorage.locked { - DataFeed(allPersisted().map { it.second.toSignedTx() }.toList(), updates.bufferUntilSubscribed()) + DataFeed(snapshot(), updates.bufferUntilSubscribed()) } } } @@ -133,6 +131,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: } @VisibleForTesting - val transactions: Iterable - get() = database.transaction { txStorage.content.allPersisted().map { it.second.toSignedTx() }.toList() } + val transactions: List get() = database.transaction { snapshot() } + + private fun snapshot() = txStorage.content.allPersisted.use { it.map { it.second.toSignedTx() }.toList() } } diff --git a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt index 8a378d30c2..a6f9d35f3d 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt @@ -12,6 +12,7 @@ import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference +import java.util.stream.Stream /** * Implements a caching layer on top of an *append-only* table accessed via Hibernate mapping. Note that if the same key is [set] twice, @@ -36,24 +37,24 @@ abstract class AppendOnlyPersistentMapBase( /** * Returns the value associated with the key, first loading that value from the storage if necessary. */ - operator fun get(key: K): V? { - return cache.get(key)!!.orElse(null) - } + operator fun get(key: K): V? = cache.get(key)?.orElse(null) - val size get() = allPersisted().toList().size + val size: Long get() = allPersisted.use { it.count() } /** - * Returns all key/value pairs from the underlying storage. + * Returns all key/value pairs from the underlying storage in a [Stream]. + * + * Make sure to close the [Stream] once it's been processed. */ - fun allPersisted(): Sequence> { - val session = currentDBSession() - val criteriaQuery = session.criteriaBuilder.createQuery(persistentEntityClass) - val root = criteriaQuery.from(persistentEntityClass) - criteriaQuery.select(root) - val query = session.createQuery(criteriaQuery) - val result = query.resultList - return result.map { x -> fromPersistentEntity(x) }.asSequence() - } + val allPersisted: Stream> + get() { + val session = currentDBSession() + val criteriaQuery = session.criteriaBuilder.createQuery(persistentEntityClass) + val root = criteriaQuery.from(persistentEntityClass) + criteriaQuery.select(root) + val query = session.createQuery(criteriaQuery) + return query.stream().map(fromPersistentEntity) + } private fun set(key: K, value: V, logWarning: Boolean, store: (K, V) -> V?): Boolean { // Will be set to true if store says it isn't in the database. @@ -157,7 +158,7 @@ abstract class AppendOnlyPersistentMapBase( Transactional.InFlight(this, key, { loadValue(key) }, { loadValue(key)!! }) } else { // If no one is writing, then the value may or may not exist in the database. - Transactional.Unknown(this, key, { loadValue(key) }) + Transactional.Unknown(this, key) { loadValue(key) } } } diff --git a/node/src/main/kotlin/net/corda/notary/experimental/bftsmart/BFTSmart.kt b/node/src/main/kotlin/net/corda/notary/experimental/bftsmart/BFTSmart.kt index 5e85bf2de6..03865fed1a 100644 --- a/node/src/main/kotlin/net/corda/notary/experimental/bftsmart/BFTSmart.kt +++ b/node/src/main/kotlin/net/corda/notary/experimental/bftsmart/BFTSmart.kt @@ -335,7 +335,7 @@ object BFTSmart { // LinkedHashMap for deterministic serialisation val committedStates = LinkedHashMap() val requests = services.database.transaction { - commitLog.allPersisted().forEach { committedStates[it.first] = it.second } + commitLog.allPersisted.use { it.forEach { committedStates[it.first] = it.second } } val criteriaQuery = session.criteriaBuilder.createQuery(PersistentUniquenessProvider.Request::class.java) criteriaQuery.select(criteriaQuery.from(PersistentUniquenessProvider.Request::class.java)) session.createQuery(criteriaQuery).resultList diff --git a/node/src/main/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLog.kt b/node/src/main/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLog.kt index d6ae994154..5b08a9c6b8 100644 --- a/node/src/main/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLog.kt +++ b/node/src/main/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLog.kt @@ -175,11 +175,13 @@ class RaftTransactionCommitLog( */ override fun snapshot(writer: SnapshotWriter) { db.transaction { - writer.writeInt(map.size) - map.allPersisted().forEach { - val bytes = it.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes - writer.writeUnsignedShort(bytes.size) - writer.writeObject(bytes) + writer.writeInt(map.size.toInt()) + map.allPersisted.use { + it.forEach { + val bytes = it.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes + writer.writeUnsignedShort(bytes.size) + writer.writeObject(bytes) + } } val criteriaQuery = session.criteriaBuilder.createQuery(PersistentUniquenessProvider.Request::class.java)