mirror of
https://github.com/corda/corda.git
synced 2025-04-06 19:07:08 +00:00
[CORDA-446] Remove entities from netmap cache & friends (#1980)
* Remove PersistentNetworkMapService and related classes. They were part of the old network map node code.
This commit is contained in:
parent
837e8800e8
commit
5490465750
@ -563,8 +563,6 @@ abstract class AbstractNode(config: NodeConfiguration,
|
||||
return PersistentKeyManagementService(identityService, partyKeys)
|
||||
}
|
||||
|
||||
abstract protected fun makeNetworkMapService(network: MessagingService, networkMapCache: NetworkMapCacheInternal): NetworkMapService
|
||||
|
||||
private fun makeCoreNotaryService(notaryConfig: NotaryConfig): NotaryService {
|
||||
val notaryKey = myNotaryIdentity?.owningKey ?: throw IllegalArgumentException("No notary identity initialized when creating a notary service")
|
||||
return if (notaryConfig.validating) {
|
||||
|
@ -19,14 +19,11 @@ import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
import net.corda.node.serialization.NodeClock
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.RPCUserServiceImpl
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.network.PersistentNetworkMapService
|
||||
import net.corda.node.utilities.AddressUtils
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.TestClock
|
||||
@ -35,10 +32,6 @@ import net.corda.nodeapi.internal.ShutdownHook
|
||||
import net.corda.nodeapi.internal.addShutdownHook
|
||||
import net.corda.nodeapi.internal.serialization.*
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.time.Clock
|
||||
@ -208,10 +201,6 @@ open class Node(override val configuration: NodeConfiguration,
|
||||
return listOf(address.hostAndPort)
|
||||
}
|
||||
|
||||
override fun makeNetworkMapService(network: MessagingService, networkMapCache: NetworkMapCacheInternal): NetworkMapService {
|
||||
return PersistentNetworkMapService()
|
||||
}
|
||||
|
||||
/**
|
||||
* If the node is persisting to an embedded H2 database, then expose this via TCP with a JDBC URL of the form:
|
||||
* jdbc:h2:tcp://<host>:<port>/node
|
||||
|
@ -1,68 +0,0 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.node.utilities.AddOrRemove
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* A network map contains lists of nodes on the network along with information about their identity keys, services
|
||||
* they provide and host names or IP addresses where they can be connected to. This information is cached locally within
|
||||
* nodes, by the [NetworkMapCache]. Currently very basic consensus controls are applied, using signed changes which
|
||||
* replace each other based on a serial number present in the change.
|
||||
*/
|
||||
// TODO: A better architecture for the network map service might be one like the Tor directory authorities, where
|
||||
// several nodes linked by RAFT or Paxos elect a leader and that leader distributes signed documents describing the
|
||||
// network layout. Those documents can then be cached by every node and thus a network map can/ be retrieved given only
|
||||
// a single successful peer connection.
|
||||
//
|
||||
// It may also be that this is replaced or merged with the identity management service; for example if the network has
|
||||
// a concept of identity changes over time, should that include the node for an identity? If so, that is likely to
|
||||
// replace this service.
|
||||
@ThreadSafe
|
||||
interface NetworkMapService {
|
||||
|
||||
val nodeRegistrations: Map<PartyAndCertificate, NodeRegistrationInfo>
|
||||
|
||||
// Map from subscriber address, to most recently acknowledged update map version.
|
||||
val subscribers: ThreadBox<MutableMap<SingleMessageRecipient, LastAcknowledgeInfo>>
|
||||
}
|
||||
|
||||
|
||||
@ThreadSafe
|
||||
class InMemoryNetworkMapService: NetworkMapService {
|
||||
|
||||
override val nodeRegistrations: MutableMap<PartyAndCertificate, NodeRegistrationInfo> = ConcurrentHashMap()
|
||||
override val subscribers = ThreadBox(mutableMapOf<SingleMessageRecipient, LastAcknowledgeInfo>())
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A node registration state in the network map.
|
||||
*
|
||||
* @param node the node being added/removed.
|
||||
* @param serial an increasing value which represents the version of this registration. Not expected to be sequential,
|
||||
* but later versions of the registration must have higher values (or they will be ignored by the map service).
|
||||
* Similar to the serial number on DNS records.
|
||||
* @param type add if the node is being added to the map, or remove if a previous node is being removed (indicated as
|
||||
* going offline).
|
||||
* @param expires when the registration expires. Only used when adding a node to a map.
|
||||
*/
|
||||
// TODO: This might alternatively want to have a node and party, with the node being optional, so registering a node
|
||||
// involves providing both node and paerty, and deregistering a node involves a request with party but no node.
|
||||
@CordaSerializable
|
||||
data class NodeRegistration(val node: NodeInfo, val serial: Long, val type: AddOrRemove, var expires: Instant) {
|
||||
override fun toString(): String = "$node #$serial ($type)"
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
data class LastAcknowledgeInfo(val mapVersion: Int)
|
||||
|
||||
@CordaSerializable
|
||||
data class NodeRegistrationInfo(val reg: NodeRegistration, val mapVersion: Int)
|
@ -1,124 +0,0 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
|
||||
import net.corda.node.utilities.NODE_DATABASE_PREFIX
|
||||
import net.corda.node.utilities.PersistentMap
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.security.cert.CertificateFactory
|
||||
import java.util.*
|
||||
import javax.persistence.*
|
||||
|
||||
/**
|
||||
* A network map service backed by a database to survive restarts of the node hosting it.
|
||||
*
|
||||
* Majority of the logic is inherited from [NetworkMapService].
|
||||
*
|
||||
* This class needs database transactions to be in-flight during method calls and init, otherwise it will throw
|
||||
* exceptions.
|
||||
*/
|
||||
class PersistentNetworkMapService: NetworkMapService {
|
||||
|
||||
// Only the node_party_path column is needed to reconstruct a PartyAndCertificate but we have the others for human readability
|
||||
@Entity
|
||||
@Table(name = "${NODE_DATABASE_PREFIX}network_map_nodes")
|
||||
class NetworkNode(
|
||||
@Id
|
||||
@Column(name = "node_party_key_hash", length = MAX_HASH_HEX_SIZE)
|
||||
var publicKeyHash: String,
|
||||
|
||||
@Column
|
||||
var nodeParty: NodeParty = NodeParty(),
|
||||
|
||||
@Lob @Column
|
||||
var registrationInfo: ByteArray = ByteArray(0)
|
||||
)
|
||||
|
||||
@Embeddable
|
||||
class NodeParty(
|
||||
@Column(name = "node_party_name")
|
||||
var name: String = "",
|
||||
|
||||
@Column(name = "node_party_certificate", length = 4096)
|
||||
var certificate: ByteArray = ByteArray(0),
|
||||
|
||||
@Column(name = "node_party_path", length = 4096)
|
||||
var certPath: ByteArray = ByteArray(0)
|
||||
)
|
||||
|
||||
private companion object {
|
||||
private val factory = CertificateFactory.getInstance("X.509")
|
||||
|
||||
fun createNetworkNodesMap(): PersistentMap<PartyAndCertificate, NodeRegistrationInfo, NetworkNode, String> {
|
||||
return PersistentMap(
|
||||
toPersistentEntityKey = { it.owningKey.toStringShort() },
|
||||
fromPersistentEntity = {
|
||||
Pair(PartyAndCertificate(factory.generateCertPath(ByteArrayInputStream(it.nodeParty.certPath))),
|
||||
it.registrationInfo.deserialize(context = SerializationDefaults.STORAGE_CONTEXT))
|
||||
},
|
||||
toPersistentEntity = { key: PartyAndCertificate, value: NodeRegistrationInfo ->
|
||||
NetworkNode(
|
||||
publicKeyHash = key.owningKey.toStringShort(),
|
||||
nodeParty = NodeParty(
|
||||
key.name.toString(),
|
||||
key.certificate.encoded,
|
||||
key.certPath.encoded
|
||||
),
|
||||
registrationInfo = value.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
|
||||
)
|
||||
},
|
||||
persistentEntityClass = NetworkNode::class.java
|
||||
)
|
||||
}
|
||||
|
||||
fun createNetworkSubscribersMap(): PersistentMap<SingleMessageRecipient, LastAcknowledgeInfo, NetworkSubscriber, String> {
|
||||
return PersistentMap(
|
||||
toPersistentEntityKey = { it.getPrimaryKeyBasedOnSubType() },
|
||||
fromPersistentEntity = {
|
||||
Pair(it.key.deserialize(context = SerializationDefaults.STORAGE_CONTEXT),
|
||||
it.value.deserialize(context = SerializationDefaults.STORAGE_CONTEXT))
|
||||
},
|
||||
toPersistentEntity = { k: SingleMessageRecipient, v: LastAcknowledgeInfo ->
|
||||
NetworkSubscriber(
|
||||
id = k.getPrimaryKeyBasedOnSubType(),
|
||||
key = k.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes,
|
||||
value = v.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
|
||||
)
|
||||
},
|
||||
persistentEntityClass = NetworkSubscriber::class.java
|
||||
)
|
||||
}
|
||||
|
||||
fun SingleMessageRecipient.getPrimaryKeyBasedOnSubType() =
|
||||
if (this is ArtemisMessagingComponent.ArtemisPeerAddress) {
|
||||
this.hostAndPort.toString()
|
||||
} else {
|
||||
this.toString()
|
||||
}
|
||||
}
|
||||
|
||||
override val nodeRegistrations: MutableMap<PartyAndCertificate, NodeRegistrationInfo> =
|
||||
Collections.synchronizedMap(createNetworkNodesMap())
|
||||
|
||||
@Entity
|
||||
@Table(name = "${NODE_DATABASE_PREFIX}network_map_subscribers")
|
||||
class NetworkSubscriber(
|
||||
@Id @Column
|
||||
var id: String = "",
|
||||
|
||||
@Column(length = 4096)
|
||||
var key: ByteArray = ByteArray(0),
|
||||
|
||||
@Column(length = 4096)
|
||||
var value: ByteArray = ByteArray(0)
|
||||
)
|
||||
|
||||
override val subscribers = ThreadBox(createNetworkSubscribersMap())
|
||||
}
|
@ -3,11 +3,7 @@ package net.corda.node.services.schema
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.FungibleAsset
|
||||
import net.corda.core.contracts.LinearState
|
||||
import net.corda.core.schemas.CommonSchemaV1
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.NodeInfoSchemaV1
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.schemas.QueryableState
|
||||
import net.corda.core.schemas.*
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
import net.corda.node.services.api.SchemaService
|
||||
@ -15,7 +11,6 @@ import net.corda.node.services.events.NodeSchedulerService
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.services.keys.PersistentKeyManagementService
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.network.PersistentNetworkMapService
|
||||
import net.corda.node.services.persistence.DBCheckpointStorage
|
||||
import net.corda.node.services.persistence.DBTransactionMappingStorage
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
@ -48,8 +43,6 @@ class NodeSchemaService(cordappLoader: CordappLoader?) : SchemaService, Singleto
|
||||
PersistentUniquenessProvider.PersistentNotaryCommit::class.java,
|
||||
NodeSchedulerService.PersistentScheduledState::class.java,
|
||||
NodeAttachmentService.DBAttachment::class.java,
|
||||
PersistentNetworkMapService.NetworkNode::class.java,
|
||||
PersistentNetworkMapService.NetworkSubscriber::class.java,
|
||||
NodeMessagingClient.ProcessedMessage::class.java,
|
||||
NodeMessagingClient.RetryMessage::class.java,
|
||||
NodeAttachmentService.DBAttachment::class.java,
|
||||
|
@ -8,11 +8,12 @@ import io.atomix.copycat.server.storage.Storage
|
||||
import io.atomix.copycat.server.storage.StorageLevel
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
import net.corda.node.utilities.DatabaseTransaction
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.LogHelper
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import net.corda.testing.freeLocalHostAndPort
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseProperties
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestIdentityService
|
||||
@ -37,14 +38,12 @@ class DistributedImmutableMapTests {
|
||||
@Before
|
||||
fun setup() {
|
||||
LogHelper.setLevel("-org.apache.activemq")
|
||||
LogHelper.setLevel(NetworkMapService::class)
|
||||
cluster = setUpCluster()
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
LogHelper.reset("org.apache.activemq")
|
||||
LogHelper.reset(NetworkMapService::class)
|
||||
cluster.forEach {
|
||||
it.client.close()
|
||||
it.server.shutdown()
|
||||
|
@ -15,7 +15,6 @@ import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.netmap.VisualiserViewModel.Style
|
||||
import net.corda.netmap.simulation.IRSSimulation
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.statemachine.SessionConfirm
|
||||
import net.corda.node.services.statemachine.SessionEnd
|
||||
import net.corda.node.services.statemachine.SessionInit
|
||||
|
@ -28,7 +28,6 @@ import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
import net.corda.node.services.Permissions.Companion.invokeRpc
|
||||
import net.corda.node.services.config.*
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.utilities.ServiceIdentityGenerator
|
||||
import net.corda.nodeapi.NodeInfoFilesCopier
|
||||
import net.corda.nodeapi.User
|
||||
@ -492,7 +491,7 @@ fun addressMustBeBoundFuture(executorService: ScheduledExecutorService, hostAndP
|
||||
|
||||
/*
|
||||
* The default timeout value of 40 seconds have been chosen based on previous node shutdown time estimate.
|
||||
* It's been observed that nodes can take up to 30 seconds to shut down, so just to stay on the safe side the 40 seconds
|
||||
* It's been observed that nodes can take up to 30 seconds to shut down, so just to stay on the safe side the 60 seconds
|
||||
* timeout has been chosen.
|
||||
*/
|
||||
fun addressMustNotBeBound(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort, timeout: Duration = 40.seconds) {
|
||||
|
@ -23,15 +23,12 @@ import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.internal.AbstractNode
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.config.BFTSMaRtConfiguration
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.NotaryConfig
|
||||
import net.corda.node.services.keys.E2ETestKeyManagementService
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.network.InMemoryNetworkMapService
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
|
||||
import net.corda.node.services.transactions.BFTSMaRt
|
||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
@ -213,10 +210,6 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete
|
||||
// Nothing to do
|
||||
}
|
||||
|
||||
override fun makeNetworkMapService(network: MessagingService, networkMapCache: NetworkMapCacheInternal): NetworkMapService {
|
||||
return InMemoryNetworkMapService()
|
||||
}
|
||||
|
||||
// This is not thread safe, but node construction is done on a single thread, so that should always be fine
|
||||
override fun generateKeyPair(): KeyPair {
|
||||
counter = counter.add(BigInteger.ONE)
|
||||
|
Loading…
x
Reference in New Issue
Block a user