mirror of
https://github.com/corda/corda.git
synced 2025-06-17 06:38:21 +00:00
[CORDA-446] Clean up other mentions of network map node and logic (#1974)
* [CORDA-446] Clean up other mentions of network map node and logic * Rename AbstractNetworkMapService to NetworkMapService and remove the empty NetworkMapService * fix build * fix artemismessaging tests * pr comments
This commit is contained in:
@ -42,10 +42,7 @@ import net.corda.node.services.events.ScheduledActivityObserver
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.services.keys.PersistentKeyManagementService
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.sendRequest
|
||||
import net.corda.node.services.network.*
|
||||
import net.corda.node.services.network.NetworkMapService.RegistrationRequest
|
||||
import net.corda.node.services.network.NetworkMapService.RegistrationResponse
|
||||
import net.corda.node.services.persistence.DBCheckpointStorage
|
||||
import net.corda.node.services.persistence.DBTransactionMappingStorage
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
@ -58,7 +55,6 @@ import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.services.vault.VaultSoftLockManager
|
||||
import net.corda.node.utilities.*
|
||||
import net.corda.node.utilities.AddOrRemove.ADD
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.slf4j.Logger
|
||||
import rx.Observable
|
||||
@ -553,15 +549,6 @@ abstract class AbstractNode(config: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
private fun sendNetworkMapRegistration(networkMapAddress: SingleMessageRecipient): CordaFuture<RegistrationResponse> {
|
||||
// Register this node against the network
|
||||
val instant = platformClock.instant()
|
||||
val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
|
||||
val reg = NodeRegistration(info, info.serial, ADD, expires)
|
||||
val request = RegistrationRequest(reg.toWire(services.keyManagementService, info.legalIdentitiesAndCerts.first().owningKey), network.myAddress)
|
||||
return network.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapAddress)
|
||||
}
|
||||
|
||||
/** Return list of node's addresses. It's overridden in MockNetwork as we don't have real addresses for MockNodes. */
|
||||
protected abstract fun myAddresses(): List<NetworkHostAndPort>
|
||||
|
||||
|
@ -11,7 +11,8 @@ import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.cordapp.CordappLoader
|
||||
import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
@ -22,8 +23,6 @@ import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer.Companion.ipDetectRequestProperty
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer.Companion.ipDetectResponseProperty
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
@ -32,10 +31,6 @@ import net.corda.node.utilities.AddressUtils
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.TestClock
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.IP_REQUEST_PREFIX
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.internal.ShutdownHook
|
||||
import net.corda.nodeapi.internal.addShutdownHook
|
||||
import net.corda.nodeapi.internal.serialization.*
|
||||
@ -46,9 +41,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.IOException
|
||||
import java.time.Clock
|
||||
import java.util.*
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.management.ObjectName
|
||||
import kotlin.system.exitProcess
|
||||
@ -160,7 +153,6 @@ open class Node(override val configuration: NodeConfiguration,
|
||||
legalIdentity.owningKey,
|
||||
serverThread,
|
||||
database,
|
||||
nodeReadyFuture,
|
||||
services.monitoringService,
|
||||
advertisedAddress)
|
||||
}
|
||||
@ -200,53 +192,6 @@ open class Node(override val configuration: NodeConfiguration,
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
* Asks the network map service to provide this node's public IP address:
|
||||
* - Connects to the network map service's message broker and creates a special IP request queue with a custom
|
||||
* request id. Marks the established session with the same request id.
|
||||
* - On the server side a special post-queue-creation callback is fired. It finds the session matching the request id
|
||||
* encoded in the queue name. It then extracts the remote IP from the session details and posts a message containing
|
||||
* it back to the queue.
|
||||
* - Once the message is received the session is closed and the queue deleted.
|
||||
*/
|
||||
private fun discoverPublicHost(serverAddress: NetworkHostAndPort): String? {
|
||||
log.trace { "Trying to detect public hostname through the Network Map Service at $serverAddress" }
|
||||
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, configuration)
|
||||
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
||||
initialConnectAttempts = 2 // TODO Public host discovery needs rewriting, as we may start nodes without network map, and we don't want to wait that long on startup.
|
||||
retryInterval = 2.seconds.toMillis()
|
||||
retryIntervalMultiplier = 1.5
|
||||
maxRetryInterval = 3.minutes.toMillis()
|
||||
}
|
||||
val clientFactory = try {
|
||||
locator.createSessionFactory()
|
||||
} catch (e: ActiveMQNotConnectedException) {
|
||||
log.warn("Unable to connect to the Network Map Service at $serverAddress for IP address discovery. " +
|
||||
"Using the provided \"${configuration.p2pAddress.host}\" as the advertised address.")
|
||||
return null
|
||||
}
|
||||
|
||||
val session = clientFactory.createSession(PEER_USER, PEER_USER, false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
|
||||
val requestId = UUID.randomUUID().toString()
|
||||
session.addMetaData(ipDetectRequestProperty, requestId)
|
||||
session.start()
|
||||
|
||||
val queueName = "$IP_REQUEST_PREFIX$requestId"
|
||||
session.createQueue(queueName, RoutingType.MULTICAST, queueName, false)
|
||||
|
||||
val consumer = session.createConsumer(queueName)
|
||||
val artemisMessage: ClientMessage = consumer.receive(10.seconds.toMillis()) ?:
|
||||
throw IOException("Did not receive a response from the Network Map Service at $serverAddress")
|
||||
val publicHostAndPort = artemisMessage.getStringProperty(ipDetectResponseProperty)
|
||||
log.info("Detected public address: $publicHostAndPort")
|
||||
|
||||
consumer.close()
|
||||
session.deleteQueue(queueName)
|
||||
clientFactory.close()
|
||||
|
||||
return NetworkHostAndPort.parse(publicHostAndPort.removePrefix("/")).host
|
||||
}
|
||||
|
||||
override fun startMessagingService(rpcOps: RPCOps) {
|
||||
// Start up the embedded MQ server
|
||||
messageBroker?.apply {
|
||||
@ -264,7 +209,7 @@ open class Node(override val configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
override fun makeNetworkMapService(network: MessagingService, networkMapCache: NetworkMapCacheInternal): NetworkMapService {
|
||||
return PersistentNetworkMapService(network, networkMapCache, configuration.minimumPlatformVersion)
|
||||
return PersistentNetworkMapService()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,6 +1,5 @@
|
||||
package net.corda.node.services.api
|
||||
|
||||
import net.corda.core.CordaException
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowInitiator
|
||||
@ -18,7 +17,6 @@ import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.node.services.NetworkMapCacheBase
|
||||
import net.corda.core.node.services.TransactionStorage
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.loggerFor
|
||||
@ -42,12 +40,6 @@ interface NetworkMapCacheBaseInternal : NetworkMapCacheBase {
|
||||
val loadDBSuccess: Boolean
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
sealed class NetworkCacheException : CordaException("Network Cache Error") {
|
||||
/** Indicates a failure to deregister, because of a rejected request from the remote node */
|
||||
class DeregistrationFailed : NetworkCacheException()
|
||||
}
|
||||
|
||||
interface ServiceHubInternal : ServiceHub {
|
||||
companion object {
|
||||
private val log = loggerFor<ServiceHubInternal>()
|
||||
|
@ -116,7 +116,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
|
||||
|
||||
/**
|
||||
* The server will make sure the bridge exists on network map changes, see method [updateBridgesOnNetworkChange]
|
||||
* We assume network map will be updated accordingly when the client node register with the network map server.
|
||||
* We assume network map will be updated accordingly when the client node register with the network map.
|
||||
*/
|
||||
@Throws(IOException::class, KeyStoreException::class)
|
||||
fun start() = mutex.locked {
|
||||
|
@ -1,11 +1,8 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.internal.concurrent.andForget
|
||||
import net.corda.core.internal.concurrent.thenMatch
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.RPCOps
|
||||
@ -16,7 +13,10 @@ import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.sequence
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.api.MonitoringService
|
||||
@ -65,8 +65,7 @@ import javax.persistence.Lob
|
||||
* CordaRPCClient class.
|
||||
*
|
||||
* @param serverAddress The address of the broker instance to connect to (might be running in the same process).
|
||||
* @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate
|
||||
* that this is a NetworkMapService node which will be bound globally to the name "networkmap".
|
||||
* @param myIdentity The public key to be used as the ArtemisMQ address and queue name for the node.
|
||||
* @param nodeExecutor An executor to run received message tasks upon.
|
||||
* @param advertisedAddress The node address for inbound connections, advertised to the network map service and peers.
|
||||
* If not provided, will default to [serverAddress].
|
||||
@ -78,7 +77,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
private val myIdentity: PublicKey,
|
||||
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
|
||||
val database: CordaPersistence,
|
||||
private val networkMapRegistrationFuture: CordaFuture<Unit>,
|
||||
val monitoringService: MonitoringService,
|
||||
advertisedAddress: NetworkHostAndPort = serverAddress
|
||||
) : ArtemisMessagingComponent(), MessagingService {
|
||||
@ -234,18 +232,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
this.producer = producer
|
||||
|
||||
// Create a queue, consumer and producer for handling P2P network messages.
|
||||
p2pConsumer = makeP2PConsumer(session, true)
|
||||
networkMapRegistrationFuture.thenMatch({
|
||||
state.locked {
|
||||
log.info("Network map is complete, so removing filter from P2P consumer.")
|
||||
try {
|
||||
p2pConsumer!!.close()
|
||||
} catch (e: ActiveMQObjectClosedException) {
|
||||
// Ignore it: this can happen if the server has gone away before we do.
|
||||
}
|
||||
p2pConsumer = makeP2PConsumer(session, false)
|
||||
}
|
||||
}, {})
|
||||
p2pConsumer = session.createConsumer(P2P_QUEUE)
|
||||
|
||||
val myCert = loadKeyStore(config.sslKeystore, config.keyStorePassword).getX509Certificate(X509Utilities.CORDA_CLIENT_TLS)
|
||||
rpcServer = RPCServer(rpcOps, NODE_USER, NODE_USER, locator, userService, CordaX500Name.build(myCert.subjectX500Principal))
|
||||
@ -267,20 +254,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
resumeMessageRedelivery()
|
||||
}
|
||||
|
||||
/**
|
||||
* We make the consumer twice, once to filter for just network map messages, and then once that is complete, we close
|
||||
* the original and make another without a filter. We do this so that there is a network map in place for all other
|
||||
* message handlers.
|
||||
*/
|
||||
private fun makeP2PConsumer(session: ClientSession, networkMapOnly: Boolean): ClientConsumer {
|
||||
return if (networkMapOnly) {
|
||||
// Filter for just the network map messages.
|
||||
val messageFilter = "hyphenated_props:$topicProperty like 'platform.network_map.%'"
|
||||
session.createConsumer(P2P_QUEUE, messageFilter)
|
||||
} else
|
||||
session.createConsumer(P2P_QUEUE)
|
||||
}
|
||||
|
||||
private fun resumeMessageRedelivery() {
|
||||
messagesToRedeliver.forEach { retryId, (message, target) ->
|
||||
send(message, target, retryId)
|
||||
@ -325,46 +298,22 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
return true
|
||||
}
|
||||
|
||||
private fun runPreNetworkMap(serverControl: ActiveMQServerControl) {
|
||||
val consumer = state.locked {
|
||||
check(started) { "start must be called first" }
|
||||
check(!running) { "run can't be called twice" }
|
||||
running = true
|
||||
rpcServer!!.start(serverControl)
|
||||
(verifierService as? OutOfProcessTransactionVerifierService)?.start(verificationResponseConsumer!!)
|
||||
p2pConsumer!!
|
||||
}
|
||||
|
||||
while (!networkMapRegistrationFuture.isDone && processMessage(consumer)) {
|
||||
}
|
||||
with(networkMapRegistrationFuture) {
|
||||
if (isDone) getOrThrow() else andForget(log) // Trigger node shutdown here to avoid deadlock in shutdown hooks.
|
||||
}
|
||||
}
|
||||
|
||||
private fun runPostNetworkMap() {
|
||||
val consumer = state.locked {
|
||||
// If it's null, it means we already called stop, so return immediately.
|
||||
p2pConsumer ?: return
|
||||
}
|
||||
|
||||
while (processMessage(consumer)) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the p2p event loop: this method only returns once [stop] has been called.
|
||||
*
|
||||
* This actually runs as two sequential loops. The first subscribes for and receives only network map messages until
|
||||
* we get our network map fetch response. At that point the filtering consumer is closed and we proceed to the second loop and
|
||||
* consume all messages via a new consumer without a filter applied.
|
||||
*/
|
||||
fun run(serverControl: ActiveMQServerControl) {
|
||||
try {
|
||||
// Build the network map.
|
||||
runPreNetworkMap(serverControl)
|
||||
// Process everything else once we have the network map.
|
||||
runPostNetworkMap()
|
||||
val consumer = state.locked {
|
||||
check(started) { "start must be called first" }
|
||||
check(!running) { "run can't be called twice" }
|
||||
running = true
|
||||
rpcServer!!.start(serverControl)
|
||||
(verifierService as? OutOfProcessTransactionVerifierService)?.start(verificationResponseConsumer!!)
|
||||
// If it's null, it means we already called stop, so return immediately.
|
||||
p2pConsumer ?: return
|
||||
}
|
||||
|
||||
while (processMessage(consumer)) { }
|
||||
} finally {
|
||||
shutdownLatch.countDown()
|
||||
}
|
||||
|
@ -1,48 +1,14 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import net.corda.core.CordaException
|
||||
import net.corda.core.crypto.DigitalSignature
|
||||
import net.corda.core.crypto.SignedData
|
||||
import net.corda.core.crypto.isFulfilledBy
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.KeyManagementService
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.api.AbstractNodeService
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.messaging.MessageHandlerRegistration
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.ServiceRequestMessage
|
||||
import net.corda.node.services.messaging.createMessage
|
||||
import net.corda.node.services.network.NetworkMapService.*
|
||||
import net.corda.node.services.network.NetworkMapService.Companion.FETCH_TOPIC
|
||||
import net.corda.node.services.network.NetworkMapService.Companion.PUSH_ACK_TOPIC
|
||||
import net.corda.node.services.network.NetworkMapService.Companion.PUSH_TOPIC
|
||||
import net.corda.node.services.network.NetworkMapService.Companion.QUERY_TOPIC
|
||||
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_TOPIC
|
||||
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_TOPIC
|
||||
import net.corda.node.utilities.AddOrRemove
|
||||
import net.corda.node.utilities.AddOrRemove.ADD
|
||||
import net.corda.node.utilities.AddOrRemove.REMOVE
|
||||
import java.io.IOException
|
||||
import java.security.PublicKey
|
||||
import java.security.SignatureException
|
||||
import java.time.Instant
|
||||
import java.time.Period
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
@ -59,256 +25,23 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
// It may also be that this is replaced or merged with the identity management service; for example if the network has
|
||||
// a concept of identity changes over time, should that include the node for an identity? If so, that is likely to
|
||||
// replace this service.
|
||||
@ThreadSafe
|
||||
interface NetworkMapService {
|
||||
|
||||
companion object {
|
||||
val DEFAULT_EXPIRATION_PERIOD: Period = Period.ofWeeks(4)
|
||||
const val FETCH_TOPIC = "platform.network_map.fetch"
|
||||
const val QUERY_TOPIC = "platform.network_map.query"
|
||||
const val REGISTER_TOPIC = "platform.network_map.register"
|
||||
const val SUBSCRIPTION_TOPIC = "platform.network_map.subscribe"
|
||||
// Base topic used when pushing out updates to the network map. Consumed, for example, by the map cache.
|
||||
// When subscribing to these updates, remember they must be acknowledged
|
||||
const val PUSH_TOPIC = "platform.network_map.push"
|
||||
// Base topic for messages acknowledging pushed updates
|
||||
const val PUSH_ACK_TOPIC = "platform.network_map.push_ack"
|
||||
}
|
||||
val nodeRegistrations: Map<PartyAndCertificate, NodeRegistrationInfo>
|
||||
|
||||
data class FetchMapRequest(val subscribe: Boolean,
|
||||
val ifChangedSinceVersion: Int?,
|
||||
override val replyTo: SingleMessageRecipient,
|
||||
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
|
||||
|
||||
@CordaSerializable
|
||||
data class FetchMapResponse(val nodes: List<NodeRegistration>?, val version: Int)
|
||||
|
||||
data class QueryIdentityRequest(val identity: PartyAndCertificate,
|
||||
override val replyTo: SingleMessageRecipient,
|
||||
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
|
||||
|
||||
@CordaSerializable
|
||||
data class QueryIdentityResponse(val node: NodeInfo?)
|
||||
|
||||
// TODO Rename this RegistractionChangeRequest or similar (and related classes)
|
||||
data class RegistrationRequest(val wireReg: WireNodeRegistration,
|
||||
override val replyTo: SingleMessageRecipient,
|
||||
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
|
||||
|
||||
/** If [error] is null then the registration was successful. If not null then it wasn't and it explains why */
|
||||
@CordaSerializable
|
||||
data class RegistrationResponse(val error: String?)
|
||||
|
||||
data class SubscribeRequest(val subscribe: Boolean,
|
||||
override val replyTo: SingleMessageRecipient,
|
||||
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
|
||||
|
||||
@CordaSerializable
|
||||
data class SubscribeResponse(val confirmed: Boolean)
|
||||
|
||||
@CordaSerializable
|
||||
data class Update(val wireReg: WireNodeRegistration, val mapVersion: Int, val replyTo: MessageRecipients)
|
||||
|
||||
@CordaSerializable
|
||||
data class UpdateAcknowledge(val mapVersion: Int, val replyTo: MessageRecipients)
|
||||
// Map from subscriber address, to most recently acknowledged update map version.
|
||||
val subscribers: ThreadBox<MutableMap<SingleMessageRecipient, LastAcknowledgeInfo>>
|
||||
}
|
||||
|
||||
object NullNetworkMapService : NetworkMapService
|
||||
|
||||
@ThreadSafe
|
||||
class InMemoryNetworkMapService(network: MessagingService, networkMapCache: NetworkMapCacheInternal, minimumPlatformVersion: Int)
|
||||
: AbstractNetworkMapService(network, networkMapCache, minimumPlatformVersion) {
|
||||
class InMemoryNetworkMapService: NetworkMapService {
|
||||
|
||||
override val nodeRegistrations: MutableMap<PartyAndCertificate, NodeRegistrationInfo> = ConcurrentHashMap()
|
||||
override val subscribers = ThreadBox(mutableMapOf<SingleMessageRecipient, LastAcknowledgeInfo>())
|
||||
|
||||
init {
|
||||
setup()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Abstracted out core functionality as the basis for a persistent implementation, as well as existing in-memory implementation.
|
||||
*
|
||||
* Design is slightly refactored to track time and map version of last acknowledge per subscriber to facilitate
|
||||
* subscriber clean up and is simpler to persist than the previous implementation based on a set of missing messages acks.
|
||||
*/
|
||||
@ThreadSafe
|
||||
abstract class AbstractNetworkMapService(network: MessagingService,
|
||||
private val networkMapCache: NetworkMapCacheInternal,
|
||||
private val minimumPlatformVersion: Int) : NetworkMapService, AbstractNodeService(network) {
|
||||
companion object {
|
||||
/**
|
||||
* Maximum credible size for a registration request. Generally requests are around 2000-6000 bytes, so this gives a
|
||||
* 10 times overhead.
|
||||
*/
|
||||
private const val MAX_SIZE_REGISTRATION_REQUEST_BYTES = 40000
|
||||
private val logger = loggerFor<AbstractNetworkMapService>()
|
||||
}
|
||||
|
||||
protected abstract val nodeRegistrations: MutableMap<PartyAndCertificate, NodeRegistrationInfo>
|
||||
|
||||
// Map from subscriber address, to most recently acknowledged update map version.
|
||||
protected abstract val subscribers: ThreadBox<MutableMap<SingleMessageRecipient, LastAcknowledgeInfo>>
|
||||
|
||||
protected val _mapVersion = AtomicInteger(0)
|
||||
|
||||
@VisibleForTesting
|
||||
val mapVersion: Int
|
||||
get() = _mapVersion.get()
|
||||
|
||||
/** Maximum number of unacknowledged updates to send to a node before automatically unregistering them for updates */
|
||||
val maxUnacknowledgedUpdates = 10
|
||||
|
||||
private val handlers = ArrayList<MessageHandlerRegistration>()
|
||||
protected fun setup() {
|
||||
// Register message handlers
|
||||
handlers += addMessageHandler(FETCH_TOPIC) { req: FetchMapRequest -> processFetchAllRequest(req) }
|
||||
handlers += addMessageHandler(QUERY_TOPIC) { req: QueryIdentityRequest -> processQueryRequest(req) }
|
||||
handlers += addMessageHandler(REGISTER_TOPIC) { req: RegistrationRequest -> processRegistrationRequest(req) }
|
||||
handlers += addMessageHandler(SUBSCRIPTION_TOPIC) { req: SubscribeRequest -> processSubscriptionRequest(req) }
|
||||
handlers += network.addMessageHandler(PUSH_ACK_TOPIC) { message, _ ->
|
||||
val req = message.data.deserialize<UpdateAcknowledge>()
|
||||
processAcknowledge(req)
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
fun unregisterNetworkHandlers() {
|
||||
for (handler in handlers) {
|
||||
network.removeMessageHandler(handler)
|
||||
}
|
||||
handlers.clear()
|
||||
}
|
||||
|
||||
private fun addSubscriber(subscriber: MessageRecipients) {
|
||||
if (subscriber !is SingleMessageRecipient) throw NodeMapException.InvalidSubscriber()
|
||||
subscribers.locked {
|
||||
if (!containsKey(subscriber)) {
|
||||
put(subscriber, LastAcknowledgeInfo(mapVersion))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun removeSubscriber(subscriber: MessageRecipients) {
|
||||
if (subscriber !is SingleMessageRecipient) throw NodeMapException.InvalidSubscriber()
|
||||
subscribers.locked { remove(subscriber) }
|
||||
}
|
||||
|
||||
private fun processAcknowledge(request: UpdateAcknowledge) {
|
||||
if (request.replyTo !is SingleMessageRecipient) throw NodeMapException.InvalidSubscriber()
|
||||
subscribers.locked {
|
||||
val lastVersionAcked = this[request.replyTo]?.mapVersion
|
||||
if ((lastVersionAcked ?: 0) < request.mapVersion) {
|
||||
this[request.replyTo] = LastAcknowledgeInfo(request.mapVersion)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun processFetchAllRequest(request: FetchMapRequest): FetchMapResponse {
|
||||
if (request.subscribe) {
|
||||
addSubscriber(request.replyTo)
|
||||
}
|
||||
val currentVersion = mapVersion
|
||||
val nodeRegistrations = if (request.ifChangedSinceVersion == null || request.ifChangedSinceVersion < currentVersion) {
|
||||
// We return back the current state of the entire map including nodes that have been removed
|
||||
ArrayList(nodeRegistrations.values.map { it.reg }) // Snapshot to avoid attempting to serialise Map internals
|
||||
} else {
|
||||
null
|
||||
}
|
||||
return FetchMapResponse(nodeRegistrations, currentVersion)
|
||||
}
|
||||
|
||||
private fun processQueryRequest(request: QueryIdentityRequest): QueryIdentityResponse {
|
||||
val candidate = nodeRegistrations[request.identity]?.reg
|
||||
// If the most recent record we have is of the node being removed from the map, then it's considered
|
||||
// as no match.
|
||||
val node = if (candidate == null || candidate.type == REMOVE) null else candidate.node
|
||||
return QueryIdentityResponse(node)
|
||||
}
|
||||
|
||||
private fun processRegistrationRequest(request: RegistrationRequest): RegistrationResponse {
|
||||
val requestSize = request.wireReg.raw.size
|
||||
logger.debug { "Received registration request of size: $requestSize" }
|
||||
if (requestSize > MAX_SIZE_REGISTRATION_REQUEST_BYTES) {
|
||||
return RegistrationResponse("Request is too big")
|
||||
}
|
||||
|
||||
val change = try {
|
||||
request.wireReg.verified()
|
||||
} catch (e: SignatureException) {
|
||||
return RegistrationResponse("Invalid signature on request")
|
||||
} catch (e: IOException) {
|
||||
val msg = "Unexpected IO exception: ${e.message}"
|
||||
logger.error(msg, e)
|
||||
return RegistrationResponse(msg)
|
||||
}
|
||||
val node = change.node
|
||||
// Get identity from signature on node's registration and use it as an index.
|
||||
val identity = node.legalIdentitiesAndCerts.singleOrNull { request.wireReg.sig.by == it.owningKey }
|
||||
identity ?: return RegistrationResponse("Key from signature on the node registration wasn't found in NodeInfo")
|
||||
|
||||
if (node.platformVersion < minimumPlatformVersion) {
|
||||
return RegistrationResponse("Minimum platform version requirement not met: $minimumPlatformVersion")
|
||||
}
|
||||
|
||||
// Update the current value atomically, so that if multiple updates come
|
||||
// in on different threads, there is no risk of a race condition while checking
|
||||
// sequence numbers.
|
||||
val registrationInfo = try {
|
||||
nodeRegistrations.compute(identity) { _, existing: NodeRegistrationInfo? ->
|
||||
require(!((existing == null || existing.reg.type == REMOVE) && change.type == REMOVE)) {
|
||||
"Attempting to de-register unknown node"
|
||||
}
|
||||
require(existing == null || existing.reg.serial < change.serial) { "Serial value is too small" }
|
||||
NodeRegistrationInfo(change, _mapVersion.incrementAndGet())
|
||||
}
|
||||
} catch (e: IllegalArgumentException) {
|
||||
return RegistrationResponse(e.message)
|
||||
}
|
||||
|
||||
notifySubscribers(request.wireReg, registrationInfo!!.mapVersion)
|
||||
|
||||
// Update the local cache
|
||||
// TODO: Once local messaging is fixed, this should go over the network layer as it does to other
|
||||
// subscribers
|
||||
when (change.type) {
|
||||
ADD -> {
|
||||
logger.info("Added node ${node.addresses} to network map")
|
||||
networkMapCache.addNode(change.node)
|
||||
}
|
||||
REMOVE -> {
|
||||
logger.info("Removed node ${node.addresses} from network map")
|
||||
networkMapCache.removeNode(change.node)
|
||||
}
|
||||
}
|
||||
|
||||
return RegistrationResponse(null)
|
||||
}
|
||||
|
||||
private fun notifySubscribers(wireReg: WireNodeRegistration, newMapVersion: Int) {
|
||||
// TODO: Once we have a better established messaging system, we can probably send
|
||||
// to a MessageRecipientGroup that nodes join/leave, rather than the network map
|
||||
// service itself managing the group
|
||||
val update = NetworkMapService.Update(wireReg, newMapVersion, network.myAddress).serialize().bytes
|
||||
val message = network.createMessage(PUSH_TOPIC, data = update)
|
||||
|
||||
subscribers.locked {
|
||||
// Remove any stale subscribers
|
||||
values.removeIf { (mapVersion) -> newMapVersion - mapVersion > maxUnacknowledgedUpdates }
|
||||
// TODO: introduce some concept of time in the condition to avoid unsubscribes when there's a message burst.
|
||||
keys.forEach { recipient -> network.send(message, recipient) }
|
||||
}
|
||||
}
|
||||
|
||||
private fun processSubscriptionRequest(request: SubscribeRequest): SubscribeResponse {
|
||||
if (request.subscribe) {
|
||||
addSubscriber(request.replyTo)
|
||||
} else {
|
||||
removeSubscriber(request.replyTo)
|
||||
}
|
||||
return SubscribeResponse(true)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A node registration state in the network map.
|
||||
@ -325,42 +58,9 @@ abstract class AbstractNetworkMapService(network: MessagingService,
|
||||
// involves providing both node and paerty, and deregistering a node involves a request with party but no node.
|
||||
@CordaSerializable
|
||||
data class NodeRegistration(val node: NodeInfo, val serial: Long, val type: AddOrRemove, var expires: Instant) {
|
||||
/**
|
||||
* Build a node registration in wire format.
|
||||
*/
|
||||
fun toWire(keyManager: KeyManagementService, publicKey: PublicKey): WireNodeRegistration {
|
||||
val regSerialized = this.serialize()
|
||||
val regSig = keyManager.sign(regSerialized.bytes, publicKey)
|
||||
|
||||
return WireNodeRegistration(regSerialized, regSig)
|
||||
}
|
||||
|
||||
override fun toString(): String = "$node #$serial ($type)"
|
||||
}
|
||||
|
||||
/**
|
||||
* A node registration and its signature as a pair.
|
||||
*/
|
||||
@CordaSerializable
|
||||
class WireNodeRegistration(raw: SerializedBytes<NodeRegistration>, sig: DigitalSignature.WithKey) : SignedData<NodeRegistration>(raw, sig) {
|
||||
@Throws(IllegalArgumentException::class)
|
||||
override fun verifyData(data: NodeRegistration) {
|
||||
// Check that the registration is fulfilled by any of node's identities.
|
||||
// TODO It may cause some problems with distributed services? We loose node's main identity. Should be all signatures instead of isFulfilledBy?
|
||||
require(data.node.legalIdentitiesAndCerts.any { it.owningKey.isFulfilledBy(sig.by) })
|
||||
}
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
sealed class NodeMapException : CordaException("Network Map Protocol Error") {
|
||||
|
||||
/** Thrown if the signature on the node info does not match the public key for the identity */
|
||||
class InvalidSignature : NodeMapException()
|
||||
|
||||
/** Thrown if the replyTo of a subscription change message is not a single message recipient */
|
||||
class InvalidSubscriber : NodeMapException()
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
data class LastAcknowledgeInfo(val mapVersion: Int)
|
||||
|
||||
|
@ -8,26 +8,23 @@ import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.utilities.NODE_DATABASE_PREFIX
|
||||
import net.corda.node.utilities.PersistentMap
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.security.cert.CertificateFactory
|
||||
import javax.persistence.*
|
||||
import java.util.*
|
||||
import javax.persistence.*
|
||||
|
||||
/**
|
||||
* A network map service backed by a database to survive restarts of the node hosting it.
|
||||
*
|
||||
* Majority of the logic is inherited from [AbstractNetworkMapService].
|
||||
* Majority of the logic is inherited from [NetworkMapService].
|
||||
*
|
||||
* This class needs database transactions to be in-flight during method calls and init, otherwise it will throw
|
||||
* exceptions.
|
||||
*/
|
||||
class PersistentNetworkMapService(network: MessagingService, networkMapCache: NetworkMapCacheInternal, minimumPlatformVersion: Int)
|
||||
: AbstractNetworkMapService(network, networkMapCache, minimumPlatformVersion) {
|
||||
class PersistentNetworkMapService: NetworkMapService {
|
||||
|
||||
// Only the node_party_path column is needed to reconstruct a PartyAndCertificate but we have the others for human readability
|
||||
@Entity
|
||||
@ -124,10 +121,4 @@ class PersistentNetworkMapService(network: MessagingService, networkMapCache: Ne
|
||||
)
|
||||
|
||||
override val subscribers = ThreadBox(createNetworkSubscribersMap())
|
||||
|
||||
init {
|
||||
// Initialise the network map version with the current highest persisted version, or zero if there are no entries.
|
||||
_mapVersion.set(nodeRegistrations.values.map { it.mapVersion }.max() ?: 0)
|
||||
setup()
|
||||
}
|
||||
}
|
||||
|
@ -14,7 +14,6 @@ import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.network.NetworkMapCacheImpl
|
||||
import net.corda.node.services.network.PersistentNetworkMapCache
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
@ -40,16 +39,19 @@ import kotlin.test.assertNull
|
||||
|
||||
//TODO This needs to be merged into P2PMessagingTest as that creates a more realistic environment
|
||||
class ArtemisMessagingTests {
|
||||
companion object {
|
||||
const val TOPIC = "platform.self"
|
||||
}
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val temporaryFolder = TemporaryFolder()
|
||||
|
||||
val serverPort = freePort()
|
||||
val rpcPort = freePort()
|
||||
val topic = "platform.self"
|
||||
val identity = generateKeyPair()
|
||||
|
||||
lateinit var config: NodeConfiguration
|
||||
@ -130,7 +132,7 @@ class ArtemisMessagingTests {
|
||||
val receivedMessages = LinkedBlockingQueue<Message>()
|
||||
|
||||
val messagingClient = createAndStartClientAndServer(receivedMessages)
|
||||
val message = messagingClient.createMessage(topic, data = "first msg".toByteArray())
|
||||
val message = messagingClient.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
messagingClient.send(message, messagingClient.myAddress)
|
||||
|
||||
val actual: Message = receivedMessages.take()
|
||||
@ -146,15 +148,9 @@ class ArtemisMessagingTests {
|
||||
val receivedMessages = LinkedBlockingQueue<Message>()
|
||||
|
||||
val messagingClient = createAndStartClientAndServer(receivedMessages)
|
||||
val message = messagingClient.createMessage(topic, data = "first msg".toByteArray())
|
||||
val message = messagingClient.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
messagingClient.send(message, messagingClient.myAddress)
|
||||
|
||||
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_TOPIC, data = "second msg".toByteArray())
|
||||
messagingClient.send(networkMapMessage, messagingClient.myAddress)
|
||||
|
||||
val actual: Message = receivedMessages.take()
|
||||
assertEquals("second msg", String(actual.data))
|
||||
assertNull(receivedMessages.poll(200, MILLISECONDS))
|
||||
settableFuture.set(Unit)
|
||||
val firstActual: Message = receivedMessages.take()
|
||||
assertEquals("first msg", String(firstActual.data))
|
||||
@ -171,17 +167,10 @@ class ArtemisMessagingTests {
|
||||
|
||||
val messagingClient = createAndStartClientAndServer(receivedMessages)
|
||||
for (iter in 1..iterations) {
|
||||
val message = messagingClient.createMessage(topic, data = "first msg $iter".toByteArray())
|
||||
val message = messagingClient.createMessage(TOPIC, data = "first msg $iter".toByteArray())
|
||||
messagingClient.send(message, messagingClient.myAddress)
|
||||
}
|
||||
|
||||
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_TOPIC, data = "second msg".toByteArray())
|
||||
messagingClient.send(networkMapMessage, messagingClient.myAddress)
|
||||
|
||||
val actual: Message = receivedMessages.take()
|
||||
assertEquals("second msg", String(actual.data))
|
||||
assertNull(receivedMessages.poll(200, MILLISECONDS))
|
||||
|
||||
// Stop client and server and create afresh.
|
||||
messagingClient.stop()
|
||||
messagingServer?.stop()
|
||||
@ -205,10 +194,7 @@ class ArtemisMessagingTests {
|
||||
|
||||
val messagingClient = createMessagingClient()
|
||||
startNodeMessagingClient()
|
||||
messagingClient.addMessageHandler(topic) { message, _ ->
|
||||
receivedMessages.add(message)
|
||||
}
|
||||
messagingClient.addMessageHandler(NetworkMapService.FETCH_TOPIC) { message, _ ->
|
||||
messagingClient.addMessageHandler(TOPIC) { message, _ ->
|
||||
receivedMessages.add(message)
|
||||
}
|
||||
// Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered.
|
||||
@ -225,7 +211,6 @@ class ArtemisMessagingTests {
|
||||
identity.public,
|
||||
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
|
||||
database,
|
||||
networkMapRegistrationFuture,
|
||||
MonitoringService(MetricRegistry())).apply {
|
||||
config.configureWithDevSSLCertificate()
|
||||
messagingClient = this
|
||||
|
Reference in New Issue
Block a user