mirror of
https://github.com/corda/corda.git
synced 2025-06-16 22:28:15 +00:00
CORDA-599 PersistentNetworkMapCache no longer circularly depends on SH (#1652)
This commit is contained in:
@ -46,7 +46,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
@Test
|
||||
fun `get nodes by owning key and by name, no network map service`() {
|
||||
val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0]
|
||||
val netCache = alice.services.networkMapCache as PersistentNetworkMapCache
|
||||
val netCache = alice.services.networkMapCache
|
||||
alice.database.transaction {
|
||||
val res = netCache.getNodeByLegalIdentity(alice.info.chooseIdentity())
|
||||
assertEquals(alice.info, res)
|
||||
@ -58,7 +58,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
@Test
|
||||
fun `get nodes by address no network map service`() {
|
||||
val alice = startNodesWithPort(listOf(ALICE), noNetworkMap = true)[0]
|
||||
val netCache = alice.services.networkMapCache as PersistentNetworkMapCache
|
||||
val netCache = alice.services.networkMapCache
|
||||
alice.database.transaction {
|
||||
val res = netCache.getNodeByAddress(alice.info.addresses[0])
|
||||
assertEquals(alice.info, res)
|
||||
|
@ -24,7 +24,6 @@ import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.StateLoader
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.node.services.NetworkMapCache.MapChange
|
||||
import net.corda.core.serialization.SerializationWhitelist
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
@ -142,6 +141,7 @@ abstract class AbstractNode(config: NodeConfiguration,
|
||||
protected val services: ServiceHubInternal get() = _services
|
||||
private lateinit var _services: ServiceHubInternalImpl
|
||||
protected lateinit var legalIdentity: PartyAndCertificate
|
||||
private lateinit var allIdentities: List<PartyAndCertificate>
|
||||
protected lateinit var info: NodeInfo
|
||||
protected var myNotaryIdentity: PartyAndCertificate? = null
|
||||
protected lateinit var checkpointStorage: CheckpointStorage
|
||||
@ -468,8 +468,12 @@ abstract class AbstractNode(config: NodeConfiguration,
|
||||
val cordappProvider = CordappProviderImpl(cordappLoader, attachments)
|
||||
_services = ServiceHubInternalImpl(schemaService, transactionStorage, stateLoader, MonitoringService(metrics), cordappProvider)
|
||||
legalIdentity = obtainIdentity(notaryConfig = null)
|
||||
// TODO We keep only notary identity as additional legalIdentity if we run it on a node . Multiple identities need more design thinking.
|
||||
myNotaryIdentity = getNotaryIdentity()
|
||||
allIdentities = listOf(legalIdentity, myNotaryIdentity).filterNotNull()
|
||||
network = makeMessagingService(legalIdentity)
|
||||
info = makeInfo(legalIdentity)
|
||||
val addresses = myAddresses() // TODO There is no support for multiple IP addresses yet.
|
||||
info = NodeInfo(addresses, allIdentities, versionInfo.platformVersion, platformClock.instant().toEpochMilli())
|
||||
val networkMapCache = services.networkMapCache
|
||||
val tokenizableServices = mutableListOf(attachments, network, services.vaultService,
|
||||
services.keyManagementService, services.identityService, platformClock,
|
||||
@ -488,15 +492,6 @@ abstract class AbstractNode(config: NodeConfiguration,
|
||||
HibernateObserver.install(services.vaultService.rawUpdates, database.hibernateConfig)
|
||||
}
|
||||
|
||||
private fun makeInfo(legalIdentity: PartyAndCertificate): NodeInfo {
|
||||
// TODO We keep only notary identity as additional legalIdentity if we run it on a node . Multiple identities need more design thinking.
|
||||
myNotaryIdentity = getNotaryIdentity()
|
||||
val allIdentitiesList = mutableListOf(legalIdentity)
|
||||
myNotaryIdentity?.let { allIdentitiesList.add(it) }
|
||||
val addresses = myAddresses() // TODO There is no support for multiple IP addresses yet.
|
||||
return NodeInfo(addresses, allIdentitiesList, versionInfo.platformVersion, platformClock.instant().toEpochMilli())
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain the node's notary identity if it's configured to be one. If part of a distributed notary then this will be
|
||||
* the distributed identity shared across all the nodes of the cluster.
|
||||
@ -665,17 +660,7 @@ abstract class AbstractNode(config: NodeConfiguration,
|
||||
val caCertificates: Array<X509Certificate> = listOf(legalIdentity.certificate, clientCa?.certificate?.cert)
|
||||
.filterNotNull()
|
||||
.toTypedArray()
|
||||
val service = PersistentIdentityService(info.legalIdentitiesAndCerts, trustRoot = trustRoot, caCertificates = *caCertificates)
|
||||
services.networkMapCache.allNodes.forEach { it.legalIdentitiesAndCerts.forEach { service.verifyAndRegisterIdentity(it) } }
|
||||
services.networkMapCache.changed.subscribe { mapChange ->
|
||||
// TODO how should we handle network map removal
|
||||
if (mapChange is MapChange.Added) {
|
||||
mapChange.node.legalIdentitiesAndCerts.forEach {
|
||||
service.verifyAndRegisterIdentity(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
return service
|
||||
return PersistentIdentityService(allIdentities, trustRoot = trustRoot, caCertificates = *caCertificates)
|
||||
}
|
||||
|
||||
protected abstract fun makeTransactionVerifierService(): TransactionVerifierService
|
||||
@ -774,7 +759,7 @@ abstract class AbstractNode(config: NodeConfiguration,
|
||||
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage()
|
||||
override val auditService = DummyAuditService()
|
||||
override val transactionVerifierService by lazy { makeTransactionVerifierService() }
|
||||
override val networkMapCache by lazy { PersistentNetworkMapCache(this) }
|
||||
override val networkMapCache by lazy { NetworkMapCacheImpl(PersistentNetworkMapCache(this@AbstractNode.database, this@AbstractNode.configuration), identityService) }
|
||||
override val vaultService by lazy { makeVaultService(keyManagementService, stateLoader) }
|
||||
override val contractUpgradeService by lazy { ContractUpgradeServiceImpl() }
|
||||
|
||||
|
@ -16,6 +16,7 @@ import net.corda.core.messaging.StateMachineTransactionMapping
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.node.services.NetworkMapCacheBase
|
||||
import net.corda.core.node.services.TransactionStorage
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
@ -28,7 +29,8 @@ import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
|
||||
interface NetworkMapCacheInternal : NetworkMapCache {
|
||||
interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBaseInternal
|
||||
interface NetworkMapCacheBaseInternal : NetworkMapCacheBase {
|
||||
/**
|
||||
* Deregister from updates from the given map service.
|
||||
* @param network the network messaging service.
|
||||
|
@ -12,6 +12,7 @@ import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.IdentityService
|
||||
import net.corda.core.node.services.NetworkMapCache.MapChange
|
||||
import net.corda.core.node.services.NotaryService
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
@ -24,12 +25,14 @@ import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.toBase58String
|
||||
import net.corda.node.services.api.NetworkCacheException
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.api.NetworkMapCacheBaseInternal
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.createMessage
|
||||
import net.corda.node.services.messaging.sendRequest
|
||||
import net.corda.node.services.network.NetworkMapService.FetchMapResponse
|
||||
import net.corda.node.services.network.NetworkMapService.SubscribeResponse
|
||||
import net.corda.node.utilities.*
|
||||
import net.corda.node.utilities.AddOrRemove
|
||||
import net.corda.node.utilities.bufferUntilDatabaseCommit
|
||||
import net.corda.node.utilities.wrapWithDatabaseTransaction
|
||||
@ -42,15 +45,32 @@ import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import kotlin.collections.HashMap
|
||||
|
||||
class NetworkMapCacheImpl(networkMapCacheBase: NetworkMapCacheBaseInternal, private val identityService: IdentityService) : NetworkMapCacheBaseInternal by networkMapCacheBase, NetworkMapCacheInternal {
|
||||
init {
|
||||
networkMapCacheBase.allNodes.forEach { it.legalIdentitiesAndCerts.forEach { identityService.verifyAndRegisterIdentity(it) } }
|
||||
networkMapCacheBase.changed.subscribe { mapChange ->
|
||||
// TODO how should we handle network map removal
|
||||
if (mapChange is MapChange.Added) {
|
||||
mapChange.node.legalIdentitiesAndCerts.forEach {
|
||||
identityService.verifyAndRegisterIdentity(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? {
|
||||
val wellKnownParty = identityService.wellKnownPartyFromAnonymous(party)
|
||||
return wellKnownParty?.let {
|
||||
getNodesByLegalIdentityKey(it.owningKey).firstOrNull()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extremely simple in-memory cache of the network map.
|
||||
*
|
||||
* @param serviceHub an optional service hub from which we'll take the identity service. We take a service hub rather
|
||||
* than the identity service directly, as this avoids problems with service start sequence (network map cache
|
||||
* and identity services depend on each other). Should always be provided except for unit test cases.
|
||||
*/
|
||||
@ThreadSafe
|
||||
open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal) : SingletonSerializeAsToken(), NetworkMapCacheInternal {
|
||||
open class PersistentNetworkMapCache(private val database: CordaPersistence, configuration: NodeConfiguration) : SingletonSerializeAsToken(), NetworkMapCacheBaseInternal {
|
||||
companion object {
|
||||
val logger = loggerFor<PersistentNetworkMapCache>()
|
||||
}
|
||||
@ -86,12 +106,12 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal)
|
||||
.sortedBy { it.name.toString() }
|
||||
}
|
||||
|
||||
private val nodeInfoSerializer = NodeInfoWatcher(serviceHub.configuration.baseDirectory,
|
||||
serviceHub.configuration.additionalNodeInfoPollingFrequencyMsec)
|
||||
private val nodeInfoSerializer = NodeInfoWatcher(configuration.baseDirectory,
|
||||
configuration.additionalNodeInfoPollingFrequencyMsec)
|
||||
|
||||
init {
|
||||
loadFromFiles()
|
||||
serviceHub.database.transaction { loadFromDB(session) }
|
||||
database.transaction { loadFromDB(session) }
|
||||
}
|
||||
|
||||
private fun loadFromFiles() {
|
||||
@ -100,7 +120,7 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal)
|
||||
}
|
||||
|
||||
override fun getPartyInfo(party: Party): PartyInfo? {
|
||||
val nodes = serviceHub.database.transaction { queryByIdentityKey(session, party.owningKey) }
|
||||
val nodes = database.transaction { queryByIdentityKey(session, party.owningKey) }
|
||||
if (nodes.size == 1 && nodes[0].isLegalIdentity(party)) {
|
||||
return PartyInfo.SingleNode(party, nodes[0].addresses)
|
||||
}
|
||||
@ -115,20 +135,13 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal)
|
||||
}
|
||||
|
||||
override fun getNodeByLegalName(name: CordaX500Name): NodeInfo? = getNodesByLegalName(name).firstOrNull()
|
||||
override fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo> = serviceHub.database.transaction { queryByLegalName(session, name) }
|
||||
override fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo> = database.transaction { queryByLegalName(session, name) }
|
||||
override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> =
|
||||
serviceHub.database.transaction { queryByIdentityKey(session, identityKey) }
|
||||
database.transaction { queryByIdentityKey(session, identityKey) }
|
||||
|
||||
override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? {
|
||||
val wellKnownParty = serviceHub.identityService.wellKnownPartyFromAnonymous(party)
|
||||
return wellKnownParty?.let {
|
||||
getNodesByLegalIdentityKey(it.owningKey).firstOrNull()
|
||||
}
|
||||
}
|
||||
override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = database.transaction { queryByAddress(session, address) }
|
||||
|
||||
override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = serviceHub.database.transaction { queryByAddress(session, address) }
|
||||
|
||||
override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = serviceHub.database.transaction { queryIdentityByLegalName(session, name) }
|
||||
override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = database.transaction { queryIdentityByLegalName(session, name) }
|
||||
|
||||
override fun track(): DataFeed<List<NodeInfo>, MapChange> {
|
||||
synchronized(_changed) {
|
||||
@ -180,13 +193,13 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal)
|
||||
val previousNode = registeredNodes.put(node.legalIdentities.first().owningKey, node) // TODO hack... we left the first one as special one
|
||||
if (previousNode == null) {
|
||||
logger.info("No previous node found")
|
||||
serviceHub.database.transaction {
|
||||
database.transaction {
|
||||
updateInfoDB(node)
|
||||
changePublisher.onNext(MapChange.Added(node))
|
||||
}
|
||||
} else if (previousNode != node) {
|
||||
logger.info("Previous node was found as: $previousNode")
|
||||
serviceHub.database.transaction {
|
||||
database.transaction {
|
||||
updateInfoDB(node)
|
||||
changePublisher.onNext(MapChange.Modified(node, previousNode))
|
||||
}
|
||||
@ -201,7 +214,7 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal)
|
||||
logger.info("Removing node with info: $node")
|
||||
synchronized(_changed) {
|
||||
registeredNodes.remove(node.legalIdentities.first().owningKey)
|
||||
serviceHub.database.transaction {
|
||||
database.transaction {
|
||||
removeInfoDB(session, node)
|
||||
changePublisher.onNext(MapChange.Removed(node))
|
||||
}
|
||||
@ -236,7 +249,7 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal)
|
||||
}
|
||||
|
||||
override val allNodes: List<NodeInfo>
|
||||
get() = serviceHub.database.transaction {
|
||||
get() = database.transaction {
|
||||
getAllInfos(session).map { it.toNodeInfo() }
|
||||
}
|
||||
|
||||
@ -285,8 +298,8 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal)
|
||||
private fun updateInfoDB(nodeInfo: NodeInfo) {
|
||||
// TODO Temporary workaround to force isolated transaction (otherwise it causes race conditions when processing
|
||||
// network map registration on network map node)
|
||||
serviceHub.database.dataSource.connection.use {
|
||||
val session = serviceHub.database.entityManagerFactory.withOptions().connection(it.apply {
|
||||
database.dataSource.connection.use {
|
||||
val session = database.entityManagerFactory.withOptions().connection(it.apply {
|
||||
transactionIsolation = 1
|
||||
}).openSession()
|
||||
session.use {
|
||||
@ -367,7 +380,7 @@ open class PersistentNetworkMapCache(private val serviceHub: ServiceHubInternal)
|
||||
}
|
||||
|
||||
override fun clearNetworkMapCache() {
|
||||
serviceHub.database.transaction {
|
||||
database.transaction {
|
||||
val result = getAllInfos(session)
|
||||
for (nodeInfo in result) session.remove(nodeInfo)
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.nhaarman.mockito_kotlin.mock
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.internal.concurrent.doneFuture
|
||||
@ -12,10 +13,10 @@ import net.corda.node.services.RPCUserServiceImpl
|
||||
import net.corda.node.services.api.MonitoringService
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.network.NetworkMapCacheImpl
|
||||
import net.corda.node.services.network.PersistentNetworkMapCache
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.testing.MockServiceHubInternal
|
||||
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
@ -57,7 +58,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() {
|
||||
var messagingClient: NodeMessagingClient? = null
|
||||
var messagingServer: ArtemisMessagingServer? = null
|
||||
|
||||
lateinit var networkMapCache: PersistentNetworkMapCache
|
||||
lateinit var networkMapCache: NetworkMapCacheImpl
|
||||
|
||||
val rpcOps = object : RPCOps {
|
||||
override val protocolVersion: Int get() = throw UnsupportedOperationException()
|
||||
@ -73,7 +74,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() {
|
||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), ::makeTestIdentityService)
|
||||
networkMapRegistrationFuture = doneFuture(Unit)
|
||||
networkMapCache = PersistentNetworkMapCache(serviceHub = object : MockServiceHubInternal(database, config) {})
|
||||
networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, config), mock())
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -83,7 +83,7 @@ class NetworkMapCacheTest {
|
||||
val aliceNode = mockNet.createPartyNode(ALICE.name)
|
||||
val notaryLegalIdentity = notaryNode.info.chooseIdentity()
|
||||
val alice = aliceNode.info.chooseIdentity()
|
||||
val notaryCache = notaryNode.services.networkMapCache as PersistentNetworkMapCache
|
||||
val notaryCache = notaryNode.services.networkMapCache
|
||||
mockNet.runNetwork()
|
||||
notaryNode.database.transaction {
|
||||
assertThat(notaryCache.getNodeByLegalIdentity(alice) != null)
|
||||
|
Reference in New Issue
Block a user