From 8a42da53626dd204a114da4f8a3b1e4fe1350b63 Mon Sep 17 00:00:00 2001 From: Ross Nicoll Date: Wed, 27 Apr 2016 14:57:25 +0100 Subject: [PATCH] Add network map service --- .../core/node/services/AbstractNodeService.kt | 36 ++- .../core/node/services/NetworkMapCache.kt | 121 +++++++ .../core/node/services/NetworkMapService.kt | 299 ++++++++++++++++++ 3 files changed, 447 insertions(+), 9 deletions(-) create mode 100644 src/main/kotlin/core/node/services/NetworkMapService.kt diff --git a/src/main/kotlin/core/node/services/AbstractNodeService.kt b/src/main/kotlin/core/node/services/AbstractNodeService.kt index 4b0ac71db4..1e5b245e35 100644 --- a/src/main/kotlin/core/node/services/AbstractNodeService.kt +++ b/src/main/kotlin/core/node/services/AbstractNodeService.kt @@ -12,28 +12,46 @@ import javax.annotation.concurrent.ThreadSafe */ @ThreadSafe abstract class AbstractNodeService(val net: MessagingService) { - /** - * Postfix for base topics when sending a request to a service. - */ - protected val topicDefaultPostfix = ".0" /** * Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of - * common boilerplate code. + * common boilerplate code. Exceptions are caught and passed to the provided consumer. + * + * @param topic the topic, without the default session ID postfix (".0) + * @param handler a function to handle the deserialised request and return a response + * @param exceptionConsumer a function to which any thrown exception is passed. */ protected inline fun addMessageHandler(topic: String, crossinline handler: (Q) -> R, - crossinline exceptionHandler: (Message, Exception) -> Unit) { - net.addMessageHandler(topic + topicDefaultPostfix, null) { message, r -> + crossinline exceptionConsumer: (Message, Exception) -> Unit) { + net.addMessageHandler(topic + TOPIC_DEFAULT_POSTFIX, null) { message, r -> try { val req = message.data.deserialize() val data = handler(req) val msg = net.createMessage(topic + "." + req.sessionID, data.serialize().bits) net.send(msg, req.replyTo) } catch(e: Exception) { - exceptionHandler(message, e) + exceptionConsumer(message, e) } } } -} \ No newline at end of file + + /** + * Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of + * common boilerplate code. Exceptions are propagated to the messaging layer. + * + * @param topic the topic, without the default session ID postfix (".0) + * @param handler a function to handle the deserialised request and return a response + */ + protected inline fun + addMessageHandler(topic: String, + crossinline handler: (Q) -> R) { + net.addMessageHandler(topic + TOPIC_DEFAULT_POSTFIX, null) { message, r -> + val req = message.data.deserialize() + val data = handler(req) + val msg = net.createMessage(topic + "." + req.sessionID, data.serialize().bits) + net.send(msg, req.replyTo) + } + } +} diff --git a/src/main/kotlin/core/node/services/NetworkMapCache.kt b/src/main/kotlin/core/node/services/NetworkMapCache.kt index 770847999a..b5155cd85e 100644 --- a/src/main/kotlin/core/node/services/NetworkMapCache.kt +++ b/src/main/kotlin/core/node/services/NetworkMapCache.kt @@ -1,8 +1,18 @@ package core.node.services +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.MoreExecutors +import com.google.common.util.concurrent.SettableFuture import core.Contract import core.Party +import core.crypto.SecureHash +import core.messaging.MessagingService +import core.messaging.StateMachineManager +import core.messaging.runOnNextMessage import core.node.NodeInfo +import core.random63BitValue +import core.serialization.deserialize +import core.serialization.serialize import core.utilities.AddOrRemove import org.slf4j.LoggerFactory import java.security.SignatureException @@ -20,6 +30,8 @@ interface NetworkMapCache { val logger = LoggerFactory.getLogger(NetworkMapCache::class.java) } + /** A list of nodes that advertise a network map service */ + val networkMapNodes: List /** A list of nodes that advertise a timestamping service */ val timestampingNodes: List /** A list of nodes that advertise a rates oracle service */ @@ -54,6 +66,20 @@ interface NetworkMapCache { */ fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? + /** + * Add a network map service; fetches a copy of the latest map from the service and subscribes to any further + * updates. + * + * @param smm state machine manager to use when requesting + * @param net the network messaging service + * @param service the network map service to fetch current state from. + * @param subscribe if the cache should subscribe to updates + * @param ifChangedSinceVer an optional version number to limit updating the map based on. If the latest map + * version is less than or equal to the given version, no update is fetched. + */ + fun addMapService(smm: StateMachineManager, net: MessagingService, service: NodeInfo, + subscribe: Boolean, ifChangedSinceVer: Int? = null): ListenableFuture + /** * Adds a node to the local cache (generally only used for adding ourselves) */ @@ -63,6 +89,15 @@ interface NetworkMapCache { * Removes a node from the local cache */ fun removeNode(node: NodeInfo) + + /** + * Deregister from updates from the given map service. + * + * @param smm state machine manager to use when requesting + * @param net the network messaging service + * @param service the network map service to fetch current state from. + */ + fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture } /** @@ -70,6 +105,8 @@ interface NetworkMapCache { */ @ThreadSafe open class InMemoryNetworkMapCache() : NetworkMapCache { + override val networkMapNodes: List + get() = get(NetworkMapService.Type) override val regulators: List get() = get(RegulatorService.Type) override val timestampingNodes: List @@ -86,6 +123,45 @@ open class InMemoryNetworkMapCache() : NetworkMapCache { override fun get(serviceType: ServiceType) = registeredNodes.filterValues { it.advertisedServices.contains(serviceType) }.map { it.value } override fun getRecommended(type: ServiceType, contract: Contract, vararg party: Party): NodeInfo? = get(type).firstOrNull() + override fun addMapService(smm: StateMachineManager, net: MessagingService, service: NodeInfo, subscribe: Boolean, + ifChangedSinceVer: Int?): ListenableFuture { + if (subscribe && !registeredForPush) { + // Add handler to the network, for updates received from the remote network map service. + net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC + ".0", null) { message, r -> + try { + val req = message.data.deserialize() + val hash = SecureHash.sha256(req.wireReg.serialize().bits) + val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX, + NetworkMapService.UpdateAcknowledge(hash, net.myAddress).serialize().bits) + net.send(ackMessage, req.replyTo) + processUpdatePush(req) + } catch(e: NodeMapError) { + NetworkMapCache.logger.warn("Failure during node map update due to bad update: ${e.javaClass.name}") + } catch(e: Exception) { + NetworkMapCache.logger.error("Exception processing update from network map service", e) + } + } + registeredForPush = true + } + + // Fetch the network map and register for updates at the same time + val sessionID = random63BitValue() + val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress, sessionID) + + // Add a message handler for the response, and prepare a future to put the data into. + // Note that the message handler will run on the network thread (not this one). + val future = SettableFuture.create() + net.runOnNextMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC + "." + sessionID, MoreExecutors.directExecutor()) { message -> + val resp = message.data.deserialize() + // We may not receive any nodes back, if the map hasn't changed since the version specified + resp.nodes?.forEach { processRegistration(it) } + future.set(Unit) + } + net.send(net.createMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC + ".0", req.serialize().bits), service.address) + + return future + } + override fun addNode(node: NodeInfo) { registeredNodes[node.identity] = node } @@ -93,6 +169,51 @@ open class InMemoryNetworkMapCache() : NetworkMapCache { override fun removeNode(node: NodeInfo) { registeredNodes.remove(node.identity) } + + /** + * Unsubscribes from updates from the given map service. + * + * @param service the network map service to listen to updates from. + */ + override fun deregisterForUpdates(smm: StateMachineManager, net: MessagingService, service: NodeInfo): ListenableFuture { + // Fetch the network map and register for updates at the same time + val sessionID = random63BitValue() + val req = NetworkMapService.SubscribeRequest(false, net.myAddress, sessionID) + + // Add a message handler for the response, and prepare a future to put the data into. + // Note that the message handler will run on the network thread (not this one). + val future = SettableFuture.create() + net.runOnNextMessage(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC + "." + sessionID, MoreExecutors.directExecutor()) { message -> + val resp = message.data.deserialize() + if (resp.confirmed) { + future.set(Unit) + } else { + future.setException(NetworkCacheError.DeregistrationFailed()) + } + } + net.send(net.createMessage(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC + ".0", req.serialize().bits), service.address) + + return future + } + + fun processUpdatePush(req: NetworkMapService.Update) { + val reg: NodeRegistration + try { + reg = req.wireReg.verified() + } catch(e: SignatureException) { + throw NodeMapError.InvalidSignature() + } + processRegistration(reg) + } + + private fun processRegistration(reg: NodeRegistration) { + // TODO: Implement filtering by sequence number, so we only accept changes that are + // more recent than the latest change we've processed. + when (reg.type) { + AddOrRemove.ADD -> addNode(reg.node) + AddOrRemove.REMOVE -> removeNode(reg.node) + } + } } sealed class NetworkCacheError : Exception() { diff --git a/src/main/kotlin/core/node/services/NetworkMapService.kt b/src/main/kotlin/core/node/services/NetworkMapService.kt new file mode 100644 index 0000000000..f17427b32c --- /dev/null +++ b/src/main/kotlin/core/node/services/NetworkMapService.kt @@ -0,0 +1,299 @@ +package core.node.services + +import co.paralleluniverse.common.util.VisibleForTesting +import core.Party +import core.ThreadBox +import core.crypto.DigitalSignature +import core.crypto.SecureHash +import core.crypto.SignedData +import core.crypto.signWithECDSA +import core.messaging.MessageRecipients +import core.messaging.MessagingService +import core.messaging.SingleMessageRecipient +import core.node.NodeInfo +import core.serialization.SerializedBytes +import core.serialization.deserialize +import core.serialization.serialize +import core.utilities.AddOrRemove +import org.slf4j.LoggerFactory +import protocols.* +import java.security.PrivateKey +import java.time.Period +import java.time.Instant +import java.util.ArrayList +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.ConcurrentHashMap +import javax.annotation.concurrent.ThreadSafe + + +/** + * A network map contains lists of nodes on the network along with information about their identity keys, services + * they provide and host names or IP addresses where they can be connected to. This information is cached locally within + * nodes, by the [NetworkMapCache]. Currently very basic consensus controls are applied, using signed changes which + * replace each other based on a serial number present in the change. + */ +// TODO: A better architecture for the network map service might be one like the Tor directory authorities, where +// several nodes linked by RAFT or Paxos elect a leader and that leader distributes signed documents describing the +// network layout. Those documents can then be cached by every node and thus a network map can/ be retrieved given only +// a single successful peer connection. +// +// It may also be that this is replaced or merged with the identity management service; for example if the network has +// a concept of identity changes over time, should that include the node for an identity? If so, that is likely to +// replace this service. +interface NetworkMapService { + object Type : ServiceType("corda.network_map") + + companion object { + val DEFAULT_EXPIRATION_PERIOD = Period.ofWeeks(4) + val FETCH_PROTOCOL_TOPIC = "platform.network_map.fetch" + val QUERY_PROTOCOL_TOPIC = "platform.network_map.query" + val REGISTER_PROTOCOL_TOPIC = "platform.network_map.register" + val SUBSCRIPTION_PROTOCOL_TOPIC = "platform.network_map.subscribe" + // Base topic used when pushing out updates to the network map. Consumed, for example, by the map cache. + // When subscribing to these updates, remember they must be acknowledged + val PUSH_PROTOCOL_TOPIC = "platform.network_map.push" + // Base topic for messages acknowledging pushed updates + val PUSH_ACK_PROTOCOL_TOPIC = "platform.network_map.push_ack" + + val logger = LoggerFactory.getLogger(NetworkMapService::class.java) + } + + val nodes: List + + class FetchMapRequest(val subscribe: Boolean, val ifChangedSinceVersion: Int?, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID) + data class FetchMapResponse(val nodes: Collection?, val version: Int) + class QueryIdentityRequest(val identity: Party, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID) + data class QueryIdentityResponse(val node: NodeInfo?) + class RegistrationRequest(val wireReg: WireNodeRegistration, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID) + data class RegistrationResponse(val success: Boolean) + class SubscribeRequest(val subscribe: Boolean, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID) + data class SubscribeResponse(val confirmed: Boolean) + data class Update(val wireReg: WireNodeRegistration, val replyTo: MessageRecipients) + data class UpdateAcknowledge(val wireRegHash: SecureHash, val replyTo: MessageRecipients) +} + +@ThreadSafe +class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, val cache: NetworkMapCache) : NetworkMapService, AbstractNodeService(net) { + private val registeredNodes = ConcurrentHashMap() + // Map from subscriber address, to a list of unacknowledged updates + private val subscribers = ThreadBox(mutableMapOf>()) + private val mapVersion = AtomicInteger(1) + /** Maximum number of unacknowledged updates to send to a node before automatically unregistering them for updates */ + val maxUnacknowledgedUpdates = 10 + /** + * Maximum credible size for a registration request. Generally requests are around 500-600 bytes, so this gives a + * 10 times overhead. + */ + val maxSizeRegistrationRequestBytes = 5500 + + // Filter reduces this to the entries that add a node to the map + override val nodes: List + get() = registeredNodes.mapNotNull { if (it.value.type == AddOrRemove.ADD) it.value.node else null } + + init { + // Register the local node with the service + val homeIdentity = home.node.identity + registeredNodes[homeIdentity] = home + + // Register message handlers + addMessageHandler(NetworkMapService.FETCH_PROTOCOL_TOPIC, + { req: NetworkMapService.FetchMapRequest -> processFetchAllRequest(req) } + ) + addMessageHandler(NetworkMapService.QUERY_PROTOCOL_TOPIC, + { req: NetworkMapService.QueryIdentityRequest -> processQueryRequest(req) } + ) + addMessageHandler(NetworkMapService.REGISTER_PROTOCOL_TOPIC, + { req: NetworkMapService.RegistrationRequest -> processRegistrationChangeRequest(req) } + ) + addMessageHandler(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, + { req: NetworkMapService.SubscribeRequest -> processSubscriptionRequest(req) } + ) + net.addMessageHandler(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX, null) { message, r -> + val req = message.data.deserialize() + processAcknowledge(req) + } + } + + private fun addSubscriber(subscriber: MessageRecipients) { + if (subscriber !is SingleMessageRecipient) throw NodeMapError.InvalidSubscriber() + subscribers.locked { + if (!containsKey(subscriber)) { + put(subscriber, mutableListOf()) + } + } + } + + private fun removeSubscriber(subscriber: MessageRecipients) { + if (subscriber !is SingleMessageRecipient) throw NodeMapError.InvalidSubscriber() + subscribers.locked { remove(subscriber) } + } + + @VisibleForTesting + fun getUnacknowledgedCount(subscriber: SingleMessageRecipient): Int? + = subscribers.locked { get(subscriber)?.count() } + + @VisibleForTesting + fun notifySubscribers(wireReg: WireNodeRegistration) { + // TODO: Once we have a better established messaging system, we can probably send + // to a MessageRecipientGroup that nodes join/leave, rather than the network map + // service itself managing the group + val update = NetworkMapService.Update(wireReg, net.myAddress).serialize().bits + val topic = NetworkMapService.PUSH_PROTOCOL_TOPIC + TOPIC_DEFAULT_POSTFIX + val message = net.createMessage(topic, update) + + subscribers.locked { + val toRemove = mutableListOf() + val hash = SecureHash.sha256(wireReg.raw.bits) + forEach { subscriber: Map.Entry> -> + val unacknowledged = subscriber.value + if (unacknowledged.count() < maxUnacknowledgedUpdates) { + unacknowledged.add(hash) + net.send(message, subscriber.key) + } else { + toRemove.add(subscriber.key) + } + } + toRemove.forEach { remove(it) } + } + } + + @VisibleForTesting + fun processAcknowledge(req: NetworkMapService.UpdateAcknowledge): Unit { + subscribers.locked { + this[req.replyTo]?.remove(req.wireRegHash) + } + } + + @VisibleForTesting + fun processFetchAllRequest(req: NetworkMapService.FetchMapRequest): NetworkMapService.FetchMapResponse { + if (req.subscribe) { + addSubscriber(req.replyTo) + } + val ver = mapVersion.get() + if (req.ifChangedSinceVersion == null || req.ifChangedSinceVersion < ver) { + val nodes = ArrayList(registeredNodes.values) // Snapshot to avoid attempting to serialise ConcurrentHashMap internals + return NetworkMapService.FetchMapResponse(nodes, ver) + } else { + return NetworkMapService.FetchMapResponse(null, ver) + } + } + + @VisibleForTesting + fun processQueryRequest(req: NetworkMapService.QueryIdentityRequest): NetworkMapService.QueryIdentityResponse { + val candidate = registeredNodes[req.identity] + + // If the most recent record we have is of the node being removed from the map, then it's considered + // as no match. + if (candidate == null || candidate.type == AddOrRemove.REMOVE) { + return NetworkMapService.QueryIdentityResponse(null) + } else { + return NetworkMapService.QueryIdentityResponse(candidate.node) + } + } + + @VisibleForTesting + fun processRegistrationChangeRequest(req: NetworkMapService.RegistrationRequest): NetworkMapService.RegistrationResponse { + require(req.wireReg.raw.size < maxSizeRegistrationRequestBytes) + val change: NodeRegistration + + try { + change = req.wireReg.verified() + } catch(e: java.security.SignatureException) { + throw NodeMapError.InvalidSignature() + } + val node = change.node + + var changed: Boolean = false + // Update the current value atomically, so that if multiple updates come + // in on different threads, there is no risk of a race condition while checking + // sequence numbers. + registeredNodes.compute(node.identity, { mapKey: Party, existing: NodeRegistration? -> + changed = existing == null || existing.serial < change.serial + if (changed) { + when (change.type) { + AddOrRemove.ADD -> change + AddOrRemove.REMOVE -> change + else -> throw NodeMapError.UnknownChangeType() + } + } else { + existing + } + }) + if (changed) { + notifySubscribers(req.wireReg) + + // Update the local cache + // TODO: Once local messaging is fixed, this should go over the network layer as it does to other + // subscribers + when (change.type) { + AddOrRemove.ADD -> { + NetworkMapService.logger.info("Added node ${node.address} to network map") + cache.addNode(change.node) + } + AddOrRemove.REMOVE -> { + NetworkMapService.logger.info("Removed node ${node.address} from network map") + cache.removeNode(change.node) + } + } + + mapVersion.incrementAndGet() + } + return NetworkMapService.RegistrationResponse(changed) + } + + @VisibleForTesting + fun processSubscriptionRequest(req: NetworkMapService.SubscribeRequest): NetworkMapService.SubscribeResponse { + when (req.subscribe) { + false -> removeSubscriber(req.replyTo) + true -> addSubscriber(req.replyTo) + } + return NetworkMapService.SubscribeResponse(true) + } +} + +/** + * A node registration state in the network map. + * + * @param node the node being added/removed. + * @param serial an increasing value which represents the version of this registration. Not expected to be sequential, + * but later versions of the registration must have higher values (or they will be ignored by the map service). + * Similar to the serial number on DNS records. + * @param type add if the node is being added to the map, or remove if a previous node is being removed (indicated as + * going offline). + * @param expires when the registration expires. Only used when adding a node to a map. + */ +// TODO: This might alternatively want to have a node and party, with the node being optional, so registering a node +// involves providing both node and paerty, and deregistering a node involves a request with party but no node. +class NodeRegistration(val node: NodeInfo, val serial: Long, val type: AddOrRemove, var expires: Instant) { + /** + * Build a node registration in wire format. + */ + fun toWire(privateKey: PrivateKey): WireNodeRegistration { + val regSerialized = this.serialize() + val regSig = privateKey.signWithECDSA(regSerialized.bits, node.identity.owningKey) + + return WireNodeRegistration(regSerialized, regSig) + } +} + +/** + * A node registration and its signature as a pair. + */ +class WireNodeRegistration(raw: SerializedBytes, sig: DigitalSignature.WithKey) : SignedData(raw, sig) { + @Throws(IllegalArgumentException::class) + override fun verifyData(reg: NodeRegistration) { + require(reg.node.identity.owningKey == sig.by) + } +} + +sealed class NodeMapError : Exception() { + + /** Thrown if the signature on the node info does not match the public key for the identity */ + class InvalidSignature : NodeMapError() + + /** Thrown if the replyTo of a subscription change message is not a single message recipient */ + class InvalidSubscriber : NodeMapError() + + /** Thrown if a change arrives which is of an unknown type */ + class UnknownChangeType : NodeMapError() +}