Removed PersistentNetworkMapCache.start() (#3661)

The loading of all node infos in the start method was unnecessary, both for the changePublisher and _loadDBSuccess, and the setting of _registrationFuture was incorrect.
This commit is contained in:
Shams Asari 2018-07-23 14:01:14 +01:00 committed by GitHub
parent 6a895401c5
commit a0183fbdfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 41 additions and 141 deletions

View File

@ -78,7 +78,7 @@ class ArtemisMessagingTest {
} }
LogHelper.setLevel(PersistentUniquenessProvider::class) LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }) database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()).start(), rigorousMock(), database) networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()), rigorousMock(), database)
} }
@After @After

View File

@ -11,16 +11,7 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext import net.corda.core.context.InvocationContext
import net.corda.core.crypto.newSecureRandom import net.corda.core.crypto.newSecureRandom
import net.corda.core.crypto.sign import net.corda.core.crypto.sign
import net.corda.core.flows.ContractUpgradeFlow import net.corda.core.flows.*
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.NotaryChangeFlow
import net.corda.core.flows.NotaryFlow
import net.corda.core.flows.StartableByService
import net.corda.core.identity.AbstractParty import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
@ -32,38 +23,20 @@ import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.NotaryService import net.corda.core.internal.notary.NotaryService
import net.corda.core.internal.uncheckedCast import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.*
import net.corda.core.messaging.FlowHandle import net.corda.core.node.*
import net.corda.core.messaging.FlowHandleImpl import net.corda.core.node.services.*
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.FlowProgressHandleImpl
import net.corda.core.messaging.RPCOps
import net.corda.core.node.AppServiceHub
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.ServicesForResolution
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.node.services.CordaService
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.KeyManagementService
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.serialization.SerializationWhitelist import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.*
import net.corda.core.utilities.days
import net.corda.core.utilities.debug
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes
import net.corda.node.CordaClock import net.corda.node.CordaClock
import net.corda.node.VersionInfo import net.corda.node.VersionInfo
import net.corda.node.cordapp.CordappLoader import net.corda.node.cordapp.CordappLoader
import net.corda.node.internal.CheckpointVerifier.verifyCheckpointsCompatible import net.corda.node.internal.CheckpointVerifier.verifyCheckpointsCompatible
import net.corda.node.internal.classloading.requireAnnotation import net.corda.node.internal.classloading.requireAnnotation
import net.corda.node.internal.cordapp.CordappConfigFileProvider import net.corda.node.internal.cordapp.CordappConfigFileProvider
import net.corda.node.internal.cordapp.JarScanningCordappLoader
import net.corda.node.internal.cordapp.CordappProviderImpl import net.corda.node.internal.cordapp.CordappProviderImpl
import net.corda.node.internal.cordapp.CordappProviderInternal import net.corda.node.internal.cordapp.CordappProviderInternal
import net.corda.node.internal.rpc.proxies.AuthenticatedRpcOpsProxy import net.corda.node.internal.rpc.proxies.AuthenticatedRpcOpsProxy
@ -72,61 +45,21 @@ import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.ContractUpgradeHandler import net.corda.node.services.ContractUpgradeHandler
import net.corda.node.services.FinalityHandler import net.corda.node.services.FinalityHandler
import net.corda.node.services.NotaryChangeHandler import net.corda.node.services.NotaryChangeHandler
import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.*
import net.corda.node.services.api.DummyAuditService import net.corda.node.services.config.*
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.SchedulerService
import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.api.VaultServiceInternal
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.services.config.BFTSMaRtConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.config.shell.toShellConfig import net.corda.node.services.config.shell.toShellConfig
import net.corda.node.services.config.shouldInitCrashShell
import net.corda.node.services.events.NodeSchedulerService import net.corda.node.services.events.NodeSchedulerService
import net.corda.node.services.events.ScheduledActivityObserver import net.corda.node.services.events.ScheduledActivityObserver
import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.NetworkMapCacheImpl import net.corda.node.services.network.*
import net.corda.node.services.network.NetworkMapClient import net.corda.node.services.persistence.*
import net.corda.node.services.network.NetworkMapUpdater
import net.corda.node.services.network.NodeInfoWatcher
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.persistence.AbstractPartyDescriptor
import net.corda.node.services.persistence.AbstractPartyToX500NameAsStringConverter
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.persistence.NodePropertiesPersistentStore
import net.corda.node.services.schema.HibernateObserver import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.ExternalEvent import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl import net.corda.node.services.transactions.*
import net.corda.node.services.statemachine.FlowMonitor
import net.corda.node.services.statemachine.SingleThreadedStateMachineManager
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.statemachine.StateMachineManagerInternal
import net.corda.node.services.statemachine.appName
import net.corda.node.services.statemachine.flowVersionAndInitiatingClass
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
import net.corda.node.services.transactions.BFTSMaRt
import net.corda.node.services.transactions.RaftNonValidatingNotaryService
import net.corda.node.services.transactions.RaftUniquenessProvider
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor
@ -271,7 +204,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks
// like a design smell. // like a design smell.
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList()) val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
persistentNetworkMapCache.start()
val (_, nodeInfo) = updateNodeInfo(persistentNetworkMapCache, null, identity, identityKeyPair) val (_, nodeInfo) = updateNodeInfo(persistentNetworkMapCache, null, identity, identityKeyPair)
nodeInfo nodeInfo
} }
@ -282,8 +214,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
Node.printBasicNodeInfo("Clearing network map cache entries") Node.printBasicNodeInfo("Clearing network map cache entries")
log.info("Starting clearing of network map cache entries...") log.info("Starting clearing of network map cache entries...")
configureDatabase(configuration.dataSourceProperties, configuration.database, { null }, { null }).use { configureDatabase(configuration.dataSourceProperties, configuration.database, { null }, { null }).use {
val networkMapCache = PersistentNetworkMapCache(it, emptyList()) PersistentNetworkMapCache(it, emptyList()).clearNetworkMapCache()
networkMapCache.clearNetworkMapCache()
} }
} }
@ -321,7 +252,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
} }
val (startedImpl, schedulerService) = database.transaction { val (startedImpl, schedulerService) = database.transaction {
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService, database) val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries), identityService, database)
val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair) val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair)
identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts) identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
val metrics = MetricRegistry() val metrics = MetricRegistry()
@ -860,13 +791,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
} }
} }
protected open fun checkNetworkMapIsInitialized() {
if (!services.networkMapCache.loadDBSuccess) {
// TODO: There should be a consistent approach to configuration error exceptions.
throw NetworkMapCacheEmptyException()
}
}
protected open fun makeKeyManagementService(identityService: IdentityService, keyPairs: Set<KeyPair>, database: CordaPersistence): KeyManagementService { protected open fun makeKeyManagementService(identityService: IdentityService, keyPairs: Set<KeyPair>, database: CordaPersistence): KeyManagementService {
return PersistentKeyManagementService(identityService, keyPairs, database) return PersistentKeyManagementService(identityService, keyPairs, database)
} }
@ -1098,11 +1022,6 @@ internal class FlowStarterImpl(private val smm: StateMachineManager, private val
class ConfigurationException(message: String) : CordaException(message) class ConfigurationException(message: String) : CordaException(message)
/**
* Thrown when a node is about to start and its network map cache doesn't contain any node.
*/
internal class NetworkMapCacheEmptyException : Exception()
/** /**
* Creates the connection pool to the database. * Creates the connection pool to the database.
* *

View File

@ -24,7 +24,6 @@ import net.corda.node.services.network.NetworkMapUpdater
import net.corda.node.services.statemachine.ExternalEvent import net.corda.node.services.statemachine.ExternalEvent
import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.contextDatabase
interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBaseInternal interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBaseInternal
interface NetworkMapCacheBaseInternal : NetworkMapCacheBase { interface NetworkMapCacheBaseInternal : NetworkMapCacheBase {
@ -41,9 +40,6 @@ interface NetworkMapCacheBaseInternal : NetworkMapCacheBase {
/** Removes a node from the local cache. */ /** Removes a node from the local cache. */
fun removeNode(node: NodeInfo) fun removeNode(node: NodeInfo)
/** Indicates if loading network map data from database was successful. */
val loadDBSuccess: Boolean
} }
interface ServiceHubInternal : ServiceHub { interface ServiceHubInternal : ServiceHub {

View File

@ -91,26 +91,8 @@ open class PersistentNetworkMapCache(
// TODO revisit the logic under which nodeReady and loadDBSuccess are set. // TODO revisit the logic under which nodeReady and loadDBSuccess are set.
// with the NetworkMapService redesign their meaning is not too well defined. // with the NetworkMapService redesign their meaning is not too well defined.
private val _registrationFuture = openFuture<Void?>() private val _nodeReady = openFuture<Void?>()
override val nodeReady: CordaFuture<Void?> get() = _registrationFuture override val nodeReady: CordaFuture<Void?> = _nodeReady
private var _loadDBSuccess: Boolean = false
override val loadDBSuccess get() = _loadDBSuccess
fun start(): PersistentNetworkMapCache {
// if we find any network map information in the db, we are good to go - if not
// we have to wait for some being added
synchronized(_changed) {
val allNodes = database.transaction { getAllInfos(session) }
if (allNodes.isNotEmpty()) {
_loadDBSuccess = true
}
allNodes.forEach {
changePublisher.onNext(MapChange.Added(it.toNodeInfo()))
}
_registrationFuture.set(null)
}
return this
}
override val notaryIdentities: List<Party> = notaries.map { it.identity } override val notaryIdentities: List<Party> = notaries.map { it.identity }
private val validatingNotaries = notaries.mapNotNullTo(HashSet()) { if (it.validating) it.identity else null } private val validatingNotaries = notaries.mapNotNullTo(HashSet()) { if (it.validating) it.identity else null }
@ -166,11 +148,15 @@ open class PersistentNetworkMapCache(
} }
} }
override fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo> = database.transaction { queryByLegalName(session, name) }.sortedByDescending { it.serial } override fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo> {
return database.transaction { queryByLegalName(session, name) }.sortedByDescending { it.serial }
}
override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> = nodesByKeyCache[identityKey]!! override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> = nodesByKeyCache[identityKey]!!
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024, { key -> database.transaction { queryByIdentityKey(session, key) } }) private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024) { key ->
database.transaction { queryByIdentityKey(session, key) }
}
override fun getNodesByOwningKeyIndex(identityKeyIndex: String): List<NodeInfo> { override fun getNodesByOwningKeyIndex(identityKeyIndex: String): List<NodeInfo> {
return database.transaction { return database.transaction {
@ -178,16 +164,21 @@ open class PersistentNetworkMapCache(
} }
} }
override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = database.transaction { queryByAddress(session, address) } override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? {
return database.transaction { queryByAddress(session, address) }
}
override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = identityByLegalNameCache.get(name)!!.orElse(null) override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? {
return identityByLegalNameCache.get(name)!!.orElse(null)
}
private val identityByLegalNameCache = NonInvalidatingCache<CordaX500Name, Optional<PartyAndCertificate>>(1024, { name -> Optional.ofNullable(database.transaction { queryIdentityByLegalName(session, name) }) }) private val identityByLegalNameCache = NonInvalidatingCache<CordaX500Name, Optional<PartyAndCertificate>>(1024) { name ->
Optional.ofNullable(database.transaction { queryIdentityByLegalName(session, name) })
}
override fun track(): DataFeed<List<NodeInfo>, MapChange> { override fun track(): DataFeed<List<NodeInfo>, MapChange> {
synchronized(_changed) { synchronized(_changed) {
val allInfos = database.transaction { getAllInfos(session).map { it.toNodeInfo() } } return DataFeed(allNodes, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction())
return DataFeed(allInfos, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction())
} }
} }
@ -214,9 +205,8 @@ open class PersistentNetworkMapCache(
logger.info("Previous node was identical to incoming one - doing nothing") logger.info("Previous node was identical to incoming one - doing nothing")
} }
} }
_loadDBSuccess = true // This is used in AbstractNode to indicate that node is ready. _nodeReady.set(null)
_registrationFuture.set(null) logger.debug { "Done adding node with info: $node" }
logger.info("Done adding node with info: $node")
} }
override fun removeNode(node: NodeInfo) { override fun removeNode(node: NodeInfo) {
@ -227,15 +217,16 @@ open class PersistentNetworkMapCache(
changePublisher.onNext(MapChange.Removed(node)) changePublisher.onNext(MapChange.Removed(node))
} }
} }
logger.info("Done removing node with info: $node") logger.debug { "Done removing node with info: $node" }
} }
override val allNodes: List<NodeInfo> override val allNodes: List<NodeInfo> get() {
get() = database.transaction { return database.transaction {
getAllInfos(session).map { it.toNodeInfo() } getAllNodeInfos(session).map { it.toNodeInfo() }
}
} }
private fun getAllInfos(session: Session): List<NodeInfoSchemaV1.PersistentNodeInfo> { private fun getAllNodeInfos(session: Session): List<NodeInfoSchemaV1.PersistentNodeInfo> {
val criteria = session.criteriaBuilder.createQuery(NodeInfoSchemaV1.PersistentNodeInfo::class.java) val criteria = session.criteriaBuilder.createQuery(NodeInfoSchemaV1.PersistentNodeInfo::class.java)
criteria.select(criteria.from(NodeInfoSchemaV1.PersistentNodeInfo::class.java)) criteria.select(criteria.from(NodeInfoSchemaV1.PersistentNodeInfo::class.java))
return session.createQuery(criteria).resultList return session.createQuery(criteria).resultList
@ -344,7 +335,7 @@ open class PersistentNetworkMapCache(
logger.info("Clearing Network Map Cache entries") logger.info("Clearing Network Map Cache entries")
invalidateCaches() invalidateCaches()
database.transaction { database.transaction {
val result = getAllInfos(session) val result = getAllNodeInfos(session)
logger.debug { "Number of node infos to be cleared: ${result.size}" } logger.debug { "Number of node infos to be cleared: ${result.size}" }
for (nodeInfo in result) session.remove(nodeInfo) for (nodeInfo in result) session.remove(nodeInfo)
} }

View File

@ -304,12 +304,6 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
return Crypto.deriveKeyPairFromEntropy(Crypto.EDDSA_ED25519_SHA512, counter) return Crypto.deriveKeyPairFromEntropy(Crypto.EDDSA_ED25519_SHA512, counter)
} }
/**
* InternalMockNetwork will ensure nodes are connected to each other. The nodes themselves
* won't be able to tell if that happened already or not.
*/
override fun checkNetworkMapIsInitialized() = Unit
override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1) override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1)
// NodeInfo requires a non-empty addresses list and so we give it a dummy value for mock nodes. // NodeInfo requires a non-empty addresses list and so we give it a dummy value for mock nodes.