mirror of
https://github.com/corda/corda.git
synced 2025-04-06 19:07:08 +00:00
CORDA-599 Remove dependency of NetworkMapService impls on ServiceHub (#1713)
This commit is contained in:
parent
91e2249410
commit
894f05d84e
@ -33,6 +33,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.classloading.requireAnnotation
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
import net.corda.node.internal.cordapp.CordappProviderImpl
|
||||
@ -95,10 +96,17 @@ import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
|
||||
// TODO: Where this node is the initial network map service, currently no networkMapService is provided.
|
||||
// In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the
|
||||
// AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in.
|
||||
abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
abstract class AbstractNode(config: NodeConfiguration,
|
||||
val advertisedServices: Set<ServiceInfo>,
|
||||
val platformClock: Clock,
|
||||
protected val versionInfo: VersionInfo,
|
||||
@VisibleForTesting val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() {
|
||||
open val configuration = config.apply {
|
||||
require(minimumPlatformVersion <= versionInfo.platformVersion) {
|
||||
"minimumPlatformVersion cannot be greater than the node's own version"
|
||||
}
|
||||
}
|
||||
|
||||
private class StartedNodeImpl<out N : AbstractNode>(
|
||||
override val internals: N,
|
||||
override val services: ServiceHubInternalImpl,
|
||||
@ -119,7 +127,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
|
||||
protected abstract val log: Logger
|
||||
protected abstract val networkMapAddress: SingleMessageRecipient?
|
||||
protected abstract val platformVersion: Int
|
||||
|
||||
// We will run as much stuff in this single thread as possible to keep the risk of thread safety bugs low during the
|
||||
// low-performance prototyping period.
|
||||
@ -452,13 +459,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
legalIdentity = obtainIdentity()
|
||||
network = makeMessagingService(legalIdentity)
|
||||
info = makeInfo(legalIdentity)
|
||||
|
||||
val networkMapCache = services.networkMapCache
|
||||
val tokenizableServices = mutableListOf(attachments, network, services.vaultService,
|
||||
services.keyManagementService, services.identityService, platformClock, services.schedulerService,
|
||||
services.auditService, services.monitoringService, services.networkMapCache, services.schemaService,
|
||||
services.auditService, services.monitoringService, networkMapCache, services.schemaService,
|
||||
services.transactionVerifierService, services.validatedTransactions, services.contractUpgradeService,
|
||||
services, cordappProvider, this)
|
||||
makeNetworkServices(tokenizableServices)
|
||||
makeNetworkServices(network, networkMapCache, tokenizableServices)
|
||||
return tokenizableServices
|
||||
}
|
||||
|
||||
@ -489,7 +496,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
val allIdentitiesList = mutableListOf(legalIdentity)
|
||||
myNotaryIdentity?.let { allIdentitiesList.add(it) }
|
||||
val addresses = myAddresses() // TODO There is no support for multiple IP addresses yet.
|
||||
return NodeInfo(addresses, allIdentitiesList, platformVersion, platformClock.instant().toEpochMilli())
|
||||
return NodeInfo(addresses, allIdentitiesList, versionInfo.platformVersion, platformClock.instant().toEpochMilli())
|
||||
}
|
||||
|
||||
/**
|
||||
@ -550,9 +557,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
private fun makeNetworkServices(tokenizableServices: MutableList<Any>) {
|
||||
private fun makeNetworkServices(network: MessagingService, networkMapCache: NetworkMapCacheInternal, tokenizableServices: MutableList<Any>) {
|
||||
val serviceTypes = advertisedServices.map { it.type }
|
||||
inNodeNetworkMapService = if (configuration.networkMapService == null) makeNetworkMapService() else NullNetworkMapService
|
||||
inNodeNetworkMapService = if (configuration.networkMapService == null) makeNetworkMapService(network, networkMapCache) else NullNetworkMapService
|
||||
val notaryServiceType = serviceTypes.singleOrNull { it.isNotary() }
|
||||
if (notaryServiceType != null) {
|
||||
val service = makeCoreNotaryService(notaryServiceType)
|
||||
@ -631,9 +638,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
return PersistentKeyManagementService(identityService, partyKeys)
|
||||
}
|
||||
|
||||
open protected fun makeNetworkMapService(): NetworkMapService {
|
||||
return PersistentNetworkMapService(services, configuration.minimumPlatformVersion)
|
||||
}
|
||||
abstract protected fun makeNetworkMapService(network: MessagingService, networkMapCache: NetworkMapCacheInternal): NetworkMapService
|
||||
|
||||
open protected fun makeCoreNotaryService(type: ServiceType): NotaryService? {
|
||||
check(myNotaryIdentity != null) { "No notary identity initialized when creating a notary service" }
|
||||
|
@ -19,6 +19,7 @@ import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
import net.corda.node.serialization.NodeClock
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.RPCUserServiceImpl
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.nodeapi.internal.ServiceInfo
|
||||
import net.corda.node.services.config.FullNodeConfiguration
|
||||
@ -27,6 +28,8 @@ import net.corda.node.services.messaging.ArtemisMessagingServer.Companion.ipDete
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer.Companion.ipDetectResponseProperty
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.network.PersistentNetworkMapService
|
||||
import net.corda.node.utilities.AddressUtils
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.TestClock
|
||||
@ -62,9 +65,9 @@ import kotlin.system.exitProcess
|
||||
*/
|
||||
open class Node(override val configuration: FullNodeConfiguration,
|
||||
advertisedServices: Set<ServiceInfo>,
|
||||
private val versionInfo: VersionInfo,
|
||||
versionInfo: VersionInfo,
|
||||
val initialiseSerialization: Boolean = true
|
||||
) : AbstractNode(configuration, advertisedServices, createClock(configuration)) {
|
||||
) : AbstractNode(configuration, advertisedServices, createClock(configuration), versionInfo) {
|
||||
companion object {
|
||||
private val logger = loggerFor<Node>()
|
||||
var renderBasicInfoToConsole = true
|
||||
@ -90,7 +93,6 @@ open class Node(override val configuration: FullNodeConfiguration,
|
||||
}
|
||||
|
||||
override val log: Logger get() = logger
|
||||
override val platformVersion: Int get() = versionInfo.platformVersion
|
||||
override val networkMapAddress: NetworkMapAddress? get() = configuration.networkMapService?.address?.let(::NetworkMapAddress)
|
||||
override fun makeTransactionVerifierService() = (network as NodeMessagingClient).verifierService
|
||||
|
||||
@ -277,6 +279,10 @@ open class Node(override val configuration: FullNodeConfiguration,
|
||||
return listOf(address.hostAndPort)
|
||||
}
|
||||
|
||||
override fun makeNetworkMapService(network: MessagingService, networkMapCache: NetworkMapCacheInternal): NetworkMapService {
|
||||
return PersistentNetworkMapService(network, networkMapCache, configuration.minimumPlatformVersion)
|
||||
}
|
||||
|
||||
/**
|
||||
* If the node is persisting to an embedded H2 database, then expose this via TCP with a JDBC URL of the form:
|
||||
* jdbc:h2:tcp://<host>:<port>/node
|
||||
|
@ -10,10 +10,7 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* Abstract superclass for services that a node can host, which provides helper functions.
|
||||
*/
|
||||
@ThreadSafe
|
||||
abstract class AbstractNodeService(val services: ServiceHubInternal) : SingletonSerializeAsToken() {
|
||||
|
||||
val network: MessagingService get() = services.networkService
|
||||
|
||||
abstract class AbstractNodeService(val network: MessagingService) : SingletonSerializeAsToken() {
|
||||
/**
|
||||
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
|
||||
* common boilerplate code. Exceptions are caught and passed to the provided consumer. If you just want a simple
|
||||
@ -36,7 +33,7 @@ abstract class AbstractNodeService(val services: ServiceHubInternal) : Singleton
|
||||
val msg = network.createMessage(topic, request.sessionID, response.serialize().bytes)
|
||||
network.send(msg, request.replyTo)
|
||||
}
|
||||
} catch(e: Exception) {
|
||||
} catch (e: Exception) {
|
||||
exceptionConsumer(message, e)
|
||||
}
|
||||
}
|
||||
|
@ -93,6 +93,7 @@ data class FullNodeConfiguration(
|
||||
require(it.username.matches("\\w+".toRegex())) { "Username ${it.username} contains invalid characters" }
|
||||
}
|
||||
require(myLegalName.commonName == null) { "Common name must be null: $myLegalName" }
|
||||
require(minimumPlatformVersion >= 1) { "minimumPlatformVersion cannot be less than 1" }
|
||||
}
|
||||
|
||||
fun calculateServices(): Set<ServiceInfo> {
|
||||
|
@ -20,8 +20,9 @@ import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.api.AbstractNodeService
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.messaging.MessageHandlerRegistration
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.ServiceRequestMessage
|
||||
import net.corda.node.services.messaging.createMessage
|
||||
import net.corda.node.services.network.NetworkMapService.*
|
||||
@ -114,8 +115,8 @@ interface NetworkMapService {
|
||||
object NullNetworkMapService : NetworkMapService
|
||||
|
||||
@ThreadSafe
|
||||
class InMemoryNetworkMapService(services: ServiceHubInternal, minimumPlatformVersion: Int)
|
||||
: AbstractNetworkMapService(services, minimumPlatformVersion) {
|
||||
class InMemoryNetworkMapService(network: MessagingService, networkMapCache: NetworkMapCacheInternal, minimumPlatformVersion: Int)
|
||||
: AbstractNetworkMapService(network, networkMapCache, minimumPlatformVersion) {
|
||||
|
||||
override val nodeRegistrations: MutableMap<PartyAndCertificate, NodeRegistrationInfo> = ConcurrentHashMap()
|
||||
override val subscribers = ThreadBox(mutableMapOf<SingleMessageRecipient, LastAcknowledgeInfo>())
|
||||
@ -132,8 +133,9 @@ class InMemoryNetworkMapService(services: ServiceHubInternal, minimumPlatformVer
|
||||
* subscriber clean up and is simpler to persist than the previous implementation based on a set of missing messages acks.
|
||||
*/
|
||||
@ThreadSafe
|
||||
abstract class AbstractNetworkMapService(services: ServiceHubInternal,
|
||||
val minimumPlatformVersion: Int) : NetworkMapService, AbstractNodeService(services) {
|
||||
abstract class AbstractNetworkMapService(network: MessagingService,
|
||||
private val networkMapCache: NetworkMapCacheInternal,
|
||||
private val minimumPlatformVersion: Int) : NetworkMapService, AbstractNodeService(network) {
|
||||
companion object {
|
||||
/**
|
||||
* Maximum credible size for a registration request. Generally requests are around 2000-6000 bytes, so this gives a
|
||||
@ -158,14 +160,6 @@ abstract class AbstractNetworkMapService(services: ServiceHubInternal,
|
||||
val maxUnacknowledgedUpdates = 10
|
||||
|
||||
private val handlers = ArrayList<MessageHandlerRegistration>()
|
||||
|
||||
init {
|
||||
require(minimumPlatformVersion >= 1) { "minimumPlatformVersion cannot be less than 1" }
|
||||
require(minimumPlatformVersion <= services.myInfo.platformVersion) {
|
||||
"minimumPlatformVersion cannot be greater than the node's own version"
|
||||
}
|
||||
}
|
||||
|
||||
protected fun setup() {
|
||||
// Register message handlers
|
||||
handlers += addMessageHandler(FETCH_TOPIC) { req: FetchMapRequest -> processFetchAllRequest(req) }
|
||||
@ -200,7 +194,7 @@ abstract class AbstractNetworkMapService(services: ServiceHubInternal,
|
||||
subscribers.locked { remove(subscriber) }
|
||||
}
|
||||
|
||||
private fun processAcknowledge(request: UpdateAcknowledge): Unit {
|
||||
private fun processAcknowledge(request: UpdateAcknowledge) {
|
||||
if (request.replyTo !is SingleMessageRecipient) throw NodeMapException.InvalidSubscriber()
|
||||
subscribers.locked {
|
||||
val lastVersionAcked = this[request.replyTo]?.mapVersion
|
||||
@ -280,11 +274,11 @@ abstract class AbstractNetworkMapService(services: ServiceHubInternal,
|
||||
when (change.type) {
|
||||
ADD -> {
|
||||
logger.info("Added node ${node.addresses} to network map")
|
||||
services.networkMapCache.addNode(change.node)
|
||||
networkMapCache.addNode(change.node)
|
||||
}
|
||||
REMOVE -> {
|
||||
logger.info("Removed node ${node.addresses} from network map")
|
||||
services.networkMapCache.removeNode(change.node)
|
||||
networkMapCache.removeNode(change.node)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,7 +7,8 @@ import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.utilities.*
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import java.io.ByteArrayInputStream
|
||||
@ -23,8 +24,8 @@ import java.util.*
|
||||
* This class needs database transactions to be in-flight during method calls and init, otherwise it will throw
|
||||
* exceptions.
|
||||
*/
|
||||
class PersistentNetworkMapService(services: ServiceHubInternal, minimumPlatformVersion: Int)
|
||||
: AbstractNetworkMapService(services, minimumPlatformVersion) {
|
||||
class PersistentNetworkMapService(network: MessagingService, networkMapCache: NetworkMapCacheInternal, minimumPlatformVersion: Int)
|
||||
: AbstractNetworkMapService(network, networkMapCache, minimumPlatformVersion) {
|
||||
|
||||
// Only the node_party_path column is needed to reconstruct a PartyAndCertificate but we have the others for human readability
|
||||
@Entity
|
||||
|
@ -7,6 +7,8 @@ import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.nodeapi.internal.ServiceInfo
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.send
|
||||
@ -20,11 +22,9 @@ import net.corda.node.services.network.NetworkMapService.Companion.PUSH_TOPIC
|
||||
import net.corda.node.services.network.NetworkMapService.Companion.QUERY_TOPIC
|
||||
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_TOPIC
|
||||
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_TOPIC
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.node.utilities.AddOrRemove
|
||||
import net.corda.node.utilities.AddOrRemove.ADD
|
||||
import net.corda.node.utilities.AddOrRemove.REMOVE
|
||||
import net.corda.nodeapi.internal.ServiceInfo
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.testing.node.MockNetwork.MockNode
|
||||
@ -275,7 +275,7 @@ abstract class AbstractNetworkMapServiceTest<out S : AbstractNetworkMapService>
|
||||
notaryIdentity: Pair<ServiceInfo, KeyPair>?,
|
||||
entropyRoot: BigInteger): MockNode {
|
||||
return object : MockNode(config, network, null, advertisedServices, id, notaryIdentity, entropyRoot) {
|
||||
override fun makeNetworkMapService() = NullNetworkMapService
|
||||
override fun makeNetworkMapService(network: MessagingService, networkMapCache: NetworkMapCacheInternal) = NullNetworkMapService
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,11 +1,13 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.nodeapi.internal.ServiceInfo
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.testing.node.MockNetwork.MockNode
|
||||
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
|
||||
import java.math.BigInteger
|
||||
import java.security.KeyPair
|
||||
|
||||
@ -35,7 +37,7 @@ class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest<Persistent
|
||||
notaryIdentity: Pair<ServiceInfo, KeyPair>?,
|
||||
entropyRoot: BigInteger): MockNode {
|
||||
return object : MockNode(config, network, networkMapAddr, advertisedServices, id, notaryIdentity, entropyRoot) {
|
||||
override fun makeNetworkMapService() = SwizzleNetworkMapService(services)
|
||||
override fun makeNetworkMapService(network: MessagingService, networkMapCache: NetworkMapCacheInternal) = SwizzleNetworkMapService(network, networkMapCache)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -44,12 +46,13 @@ class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest<Persistent
|
||||
* We use a special [NetworkMapService] that allows us to switch in a new instance at any time to check that the
|
||||
* state within it is correctly restored.
|
||||
*/
|
||||
private class SwizzleNetworkMapService(val services: ServiceHubInternal) : NetworkMapService {
|
||||
var delegate: PersistentNetworkMapService = PersistentNetworkMapService(services, 1)
|
||||
private class SwizzleNetworkMapService(private val delegateFactory: () -> PersistentNetworkMapService) : NetworkMapService {
|
||||
constructor(network: MessagingService, networkMapCache: NetworkMapCacheInternal) : this({ PersistentNetworkMapService(network, networkMapCache, 1) })
|
||||
|
||||
var delegate = delegateFactory()
|
||||
fun swizzle() {
|
||||
delegate.unregisterNetworkHandlers()
|
||||
delegate = PersistentNetworkMapService(services, 1)
|
||||
delegate = delegateFactory()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -27,6 +27,9 @@ import net.corda.core.utilities.loggerFor
|
||||
import net.corda.finance.utils.WorldMapLocation
|
||||
import net.corda.node.internal.AbstractNode
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.nodeapi.internal.ServiceInfo
|
||||
import net.corda.nodeapi.internal.ServiceType
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
@ -41,9 +44,8 @@ import net.corda.node.services.transactions.ValidatingNotaryService
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
|
||||
import net.corda.node.utilities.CertificateAndKeyPair
|
||||
import net.corda.nodeapi.internal.ServiceInfo
|
||||
import net.corda.nodeapi.internal.ServiceType
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.slf4j.Logger
|
||||
@ -158,10 +160,9 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
val id: Int,
|
||||
internal val notaryIdentity: Pair<ServiceInfo, KeyPair>?,
|
||||
val entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue())) :
|
||||
AbstractNode(config, advertisedServices, TestClock(), mockNet.busyLatch) {
|
||||
AbstractNode(config, advertisedServices, TestClock(), MOCK_VERSION_INFO, mockNet.busyLatch) {
|
||||
var counter = entropyRoot
|
||||
override val log: Logger = loggerFor<MockNode>()
|
||||
override val platformVersion: Int get() = 1
|
||||
override val serverThread: AffinityExecutor =
|
||||
if (mockNet.threadPerNode)
|
||||
ServiceAffinityExecutor("Mock node $id thread", 1)
|
||||
@ -215,8 +216,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
// Nothing to do
|
||||
}
|
||||
|
||||
override fun makeNetworkMapService(): NetworkMapService {
|
||||
return InMemoryNetworkMapService(services, platformVersion)
|
||||
override fun makeNetworkMapService(network: MessagingService, networkMapCache: NetworkMapCacheInternal): NetworkMapService {
|
||||
return InMemoryNetworkMapService(network, networkMapCache, 1)
|
||||
}
|
||||
|
||||
override fun getNotaryIdentity(): PartyAndCertificate? {
|
||||
|
Loading…
x
Reference in New Issue
Block a user