From ee3e1cbcf23a179d8d125a66d12dc7b382aa31d9 Mon Sep 17 00:00:00 2001 From: meejah Date: Tue, 22 May 2018 10:42:34 -0600 Subject: [PATCH] wip; updates to grid-manager impl --- src/allmydata/client.py | 21 +++++++ src/allmydata/immutable/upload.py | 2 +- src/allmydata/storage_client.py | 96 ++++++++++++++++++++++++++++--- 3 files changed, 111 insertions(+), 8 deletions(-) diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 4702436ed..b9225e169 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -15,6 +15,7 @@ from twisted.application.internet import TimerService from twisted.python.filepath import FilePath from twisted.python.failure import Failure from pycryptopp.publickey import rsa +from pycryptopp.publickey import ed25519 import allmydata from allmydata.storage.server import StorageServer @@ -31,6 +32,7 @@ from allmydata.util.abbreviate import parse_abbreviated_size from allmydata.util.time_format import parse_duration, parse_date from allmydata.util.i2p_provider import create as create_i2p_provider from allmydata.util.tor_provider import create as create_tor_provider +from allmydata.util.base32 import a2b, b2a from allmydata.stats import StatsProvider from allmydata.history import History from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION, DEFAULT_MAX_SEGMENT_SIZE @@ -58,6 +60,7 @@ def _valid_config_sections(): "shares.needed", "shares.total", "stats_gatherer.furl", + "grid_managers", ), "drop_upload": ( # deprecated already? "enabled", @@ -380,10 +383,28 @@ def create_storage_farm_broker(config, default_connection_handlers, foolscap_con **kwargs ) + # grid manager setup + + grid_manager_keys = [] + gm_keydata = self.get_config('client', 'grid_manager_public_keys', '') + for gm_key in gm_keydata.strip().split(): + grid_manager_keys.append( + keyutil.parse_pubkey(a2b(gm_key)) + ) + + my_pubkey = keyutil.parse_pubkey( + self.get_config_from_file("node.pubkey") + ) + + # create the actual storage-broker + sb = storage_client.StorageFarmBroker( permute_peers=True, tub_maker=tub_creator, preferred_peers=preferred_peers, + grid_manager_keys=grid_manager_keys, + node_pubkey=my_pubkey, + ) for ic in introducer_clients: sb.use_introducer(ic) diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index 6720e4195..79c632c91 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -515,7 +515,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): # 0. Start with an ordered list of servers. Maybe *2N* of them. # - all_servers = storage_broker.get_servers_for_psi(storage_index) + all_servers = storage_broker.get_servers_for_psi(storage_index, for_upload=True) if not all_servers: raise NoServersError("client gave us zero servers") diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 65c65f535..34e96f38a 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -29,7 +29,11 @@ the foolscap-based server implemented in src/allmydata/storage/*.py . # 6: implement other sorts of IStorageClient classes: S3, etc -import re, time, hashlib +import re +import time +import hashlib +import json + from zope.interface import implementer from twisted.internet import defer from twisted.application import service @@ -67,12 +71,14 @@ class StorageFarmBroker(service.MultiService): I'm also responsible for subscribing to the IntroducerClient to find out about new servers as they are announced by the Introducer. """ - def __init__(self, permute_peers, tub_maker, preferred_peers=()): + def __init__(self, permute_peers, tub_maker, preferred_peers=(), grid_manager_keys=[], node_pubkey=None): service.MultiService.__init__(self) assert permute_peers # False not implemented yet self.permute_peers = permute_peers self._tub_maker = tub_maker self.preferred_peers = preferred_peers + self._grid_manager_keys = grid_manager_keys + self._node_pubkey = node_pubkey # self.servers maps serverid -> IServer, and keeps track of all the # storage servers that we've heard about. Each descriptor manages its @@ -91,7 +97,7 @@ class StorageFarmBroker(service.MultiService): self._static_server_ids.add(server_id) handler_overrides = server.get("connections", {}) s = NativeStorageServer(server_id, server["ann"], - self._tub_maker, handler_overrides) + self._tub_maker, handler_overrides, [], self._node_pubkey) s.on_status_changed(lambda _: self._got_connection()) s.setServiceParent(self) self.servers[server_id] = s @@ -110,7 +116,7 @@ class StorageFarmBroker(service.MultiService): # these two are used in unit tests def test_add_rref(self, serverid, rref, ann): - s = NativeStorageServer(serverid, ann.copy(), self._tub_maker, {}) + s = NativeStorageServer(serverid, ann.copy(), self._tub_maker, {}, [], self._node_pubkey) s.rref = rref s._is_connected = True self.servers[serverid] = s @@ -151,7 +157,7 @@ class StorageFarmBroker(service.MultiService): facility="tahoe.storage_broker", umid="AlxzqA", level=log.UNUSUAL) return - s = NativeStorageServer(server_id, ann, self._tub_maker, {}) + s = NativeStorageServer(server_id, ann, self._tub_maker, {}, self._grid_manager_keys, self._node_pubkey) s.on_status_changed(lambda _: self._got_connection()) server_id = s.get_serverid() old = self.servers.get(server_id) @@ -192,11 +198,26 @@ class StorageFarmBroker(service.MultiService): for dsc in self.servers.values(): dsc.try_to_connect() - def get_servers_for_psi(self, peer_selection_index): + def get_servers_for_psi(self, peer_selection_index, for_upload=False): + """ + :param for_upload: used to determine if we should include any + servers that are invalid according to Grid Manager + processing. When for_upload is True and we have any Grid + Manager keys configured, any storage servers with invalid or + missing certificates will be excluded. + """ # return a list of server objects (IServers) assert self.permute_peers == True connected_servers = self.get_connected_servers() preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers) + if for_upload: + print("upload processing: {}".format([srv.upload_permitted() for srv in connected_servers])) + connected_servers = [ + srv + for srv in connected_servers + if srv.upload_permitted() + ] + def _permuted(server): seed = server.get_permutation_seed() is_unpreferred = server not in preferred_servers @@ -248,6 +269,38 @@ class StubServer(object): def get_nickname(self): return "?" + +def _validate_grid_manager_certificate(gm_key, alleged_cert, storage_pubkey): + """ + :param gm_key: a VerifyingKey instance, a Grid Manager's public + key. + + :param cert: dict with "certificate" and "signature" keys, where + "certificate" contains a JSON-serialized certificate for a Storage + Server (comes from a Grid Manager). + + :return: False if the signature is invalid or the certificate is + expired. + """ + try: + gm_key.verify( + alleged_cert['signature'], + alleged_cert['certificate'], + ) + except Exception: + return False + # signature is valid; now we can load the actual data + cert = json.loads(alleged_cert['certificate']) + now = datetime.utcnow() + expires = datetime.fromordinal(cert['expires']) + cert_pubkey = keyutil.parse_pubkey(cert['public_key']) + if cert_pubkey != storage_pubkey: + return False # certificate is for wrong server + if expires < now: + return False # certificate is expired + return True + + @implementer(IServer) class NativeStorageServer(service.MultiService): """I hold information about a storage server that we want to connect to. @@ -276,13 +329,15 @@ class NativeStorageServer(service.MultiService): "application-version": "unknown: no get_version()", } - def __init__(self, server_id, ann, tub_maker, handler_overrides): + def __init__(self, server_id, ann, tub_maker, handler_overrides, grid_manager_keys, node_pubkey): service.MultiService.__init__(self) assert isinstance(server_id, str) self._server_id = server_id self.announcement = ann self._tub_maker = tub_maker self._handler_overrides = handler_overrides + self._node_pubkey = node_pubkey + self._grid_manager_keys = grid_manager_keys assert "anonymous-storage-FURL" in ann, ann furl = str(ann["anonymous-storage-FURL"]) @@ -321,6 +376,33 @@ class NativeStorageServer(service.MultiService): self._trigger_cb = None self._on_status_changed = ObserverList() + def upload_permitted(self): + """ + If our client is configured with Grid Manager public-keys, we will + only upload to storage servers that have a currently-valid + certificate signed by at least one of the Grid Managers we + accept. + + :return: True if we should use this server for uploads, False + otherwise. + """ + # if we have no Grid Manager keys configured, choice is easy + if not self._grid_manager_keys: + return True + + # XXX probably want to cache the answer to this? (ignoring + # that for now because certificates expire, so .. slightly + # more complex) + certificates = self.announcements.get("grid-manager-certificates", None) + if not certificates: + return False + for gm_key in self._grid_manager_keys: + for cert in certificates: + if _validate_grid_manager_certificate(gm_key, cert, self._pubkey): + return True + return False + + def on_status_changed(self, status_changed): """ :param status_changed: a callable taking a single arg (the