wip; updates to grid-manager impl

This commit is contained in:
meejah 2018-05-22 10:42:34 -06:00
parent 64eb9d7c30
commit ee3e1cbcf2
3 changed files with 111 additions and 8 deletions

View File

@ -15,6 +15,7 @@ from twisted.application.internet import TimerService
from twisted.python.filepath import FilePath
from twisted.python.failure import Failure
from pycryptopp.publickey import rsa
from pycryptopp.publickey import ed25519
import allmydata
from allmydata.storage.server import StorageServer
@ -31,6 +32,7 @@ from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.util.time_format import parse_duration, parse_date
from allmydata.util.i2p_provider import create as create_i2p_provider
from allmydata.util.tor_provider import create as create_tor_provider
from allmydata.util.base32 import a2b, b2a
from allmydata.stats import StatsProvider
from allmydata.history import History
from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION, DEFAULT_MAX_SEGMENT_SIZE
@ -58,6 +60,7 @@ def _valid_config_sections():
"shares.needed",
"shares.total",
"stats_gatherer.furl",
"grid_managers",
),
"drop_upload": ( # deprecated already?
"enabled",
@ -380,10 +383,28 @@ def create_storage_farm_broker(config, default_connection_handlers, foolscap_con
**kwargs
)
# grid manager setup
grid_manager_keys = []
gm_keydata = self.get_config('client', 'grid_manager_public_keys', '')
for gm_key in gm_keydata.strip().split():
grid_manager_keys.append(
keyutil.parse_pubkey(a2b(gm_key))
)
my_pubkey = keyutil.parse_pubkey(
self.get_config_from_file("node.pubkey")
)
# create the actual storage-broker
sb = storage_client.StorageFarmBroker(
permute_peers=True,
tub_maker=tub_creator,
preferred_peers=preferred_peers,
grid_manager_keys=grid_manager_keys,
node_pubkey=my_pubkey,
)
for ic in introducer_clients:
sb.use_introducer(ic)

View File

@ -515,7 +515,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
# 0. Start with an ordered list of servers. Maybe *2N* of them.
#
all_servers = storage_broker.get_servers_for_psi(storage_index)
all_servers = storage_broker.get_servers_for_psi(storage_index, for_upload=True)
if not all_servers:
raise NoServersError("client gave us zero servers")

View File

@ -29,7 +29,11 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
# 6: implement other sorts of IStorageClient classes: S3, etc
import re, time, hashlib
import re
import time
import hashlib
import json
from zope.interface import implementer
from twisted.internet import defer
from twisted.application import service
@ -67,12 +71,14 @@ class StorageFarmBroker(service.MultiService):
I'm also responsible for subscribing to the IntroducerClient to find out
about new servers as they are announced by the Introducer.
"""
def __init__(self, permute_peers, tub_maker, preferred_peers=()):
def __init__(self, permute_peers, tub_maker, preferred_peers=(), grid_manager_keys=[], node_pubkey=None):
service.MultiService.__init__(self)
assert permute_peers # False not implemented yet
self.permute_peers = permute_peers
self._tub_maker = tub_maker
self.preferred_peers = preferred_peers
self._grid_manager_keys = grid_manager_keys
self._node_pubkey = node_pubkey
# self.servers maps serverid -> IServer, and keeps track of all the
# storage servers that we've heard about. Each descriptor manages its
@ -91,7 +97,7 @@ class StorageFarmBroker(service.MultiService):
self._static_server_ids.add(server_id)
handler_overrides = server.get("connections", {})
s = NativeStorageServer(server_id, server["ann"],
self._tub_maker, handler_overrides)
self._tub_maker, handler_overrides, [], self._node_pubkey)
s.on_status_changed(lambda _: self._got_connection())
s.setServiceParent(self)
self.servers[server_id] = s
@ -110,7 +116,7 @@ class StorageFarmBroker(service.MultiService):
# these two are used in unit tests
def test_add_rref(self, serverid, rref, ann):
s = NativeStorageServer(serverid, ann.copy(), self._tub_maker, {})
s = NativeStorageServer(serverid, ann.copy(), self._tub_maker, {}, [], self._node_pubkey)
s.rref = rref
s._is_connected = True
self.servers[serverid] = s
@ -151,7 +157,7 @@ class StorageFarmBroker(service.MultiService):
facility="tahoe.storage_broker", umid="AlxzqA",
level=log.UNUSUAL)
return
s = NativeStorageServer(server_id, ann, self._tub_maker, {})
s = NativeStorageServer(server_id, ann, self._tub_maker, {}, self._grid_manager_keys, self._node_pubkey)
s.on_status_changed(lambda _: self._got_connection())
server_id = s.get_serverid()
old = self.servers.get(server_id)
@ -192,11 +198,26 @@ class StorageFarmBroker(service.MultiService):
for dsc in self.servers.values():
dsc.try_to_connect()
def get_servers_for_psi(self, peer_selection_index):
def get_servers_for_psi(self, peer_selection_index, for_upload=False):
"""
:param for_upload: used to determine if we should include any
servers that are invalid according to Grid Manager
processing. When for_upload is True and we have any Grid
Manager keys configured, any storage servers with invalid or
missing certificates will be excluded.
"""
# return a list of server objects (IServers)
assert self.permute_peers == True
connected_servers = self.get_connected_servers()
preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers)
if for_upload:
print("upload processing: {}".format([srv.upload_permitted() for srv in connected_servers]))
connected_servers = [
srv
for srv in connected_servers
if srv.upload_permitted()
]
def _permuted(server):
seed = server.get_permutation_seed()
is_unpreferred = server not in preferred_servers
@ -248,6 +269,38 @@ class StubServer(object):
def get_nickname(self):
return "?"
def _validate_grid_manager_certificate(gm_key, alleged_cert, storage_pubkey):
"""
:param gm_key: a VerifyingKey instance, a Grid Manager's public
key.
:param cert: dict with "certificate" and "signature" keys, where
"certificate" contains a JSON-serialized certificate for a Storage
Server (comes from a Grid Manager).
:return: False if the signature is invalid or the certificate is
expired.
"""
try:
gm_key.verify(
alleged_cert['signature'],
alleged_cert['certificate'],
)
except Exception:
return False
# signature is valid; now we can load the actual data
cert = json.loads(alleged_cert['certificate'])
now = datetime.utcnow()
expires = datetime.fromordinal(cert['expires'])
cert_pubkey = keyutil.parse_pubkey(cert['public_key'])
if cert_pubkey != storage_pubkey:
return False # certificate is for wrong server
if expires < now:
return False # certificate is expired
return True
@implementer(IServer)
class NativeStorageServer(service.MultiService):
"""I hold information about a storage server that we want to connect to.
@ -276,13 +329,15 @@ class NativeStorageServer(service.MultiService):
"application-version": "unknown: no get_version()",
}
def __init__(self, server_id, ann, tub_maker, handler_overrides):
def __init__(self, server_id, ann, tub_maker, handler_overrides, grid_manager_keys, node_pubkey):
service.MultiService.__init__(self)
assert isinstance(server_id, str)
self._server_id = server_id
self.announcement = ann
self._tub_maker = tub_maker
self._handler_overrides = handler_overrides
self._node_pubkey = node_pubkey
self._grid_manager_keys = grid_manager_keys
assert "anonymous-storage-FURL" in ann, ann
furl = str(ann["anonymous-storage-FURL"])
@ -321,6 +376,33 @@ class NativeStorageServer(service.MultiService):
self._trigger_cb = None
self._on_status_changed = ObserverList()
def upload_permitted(self):
"""
If our client is configured with Grid Manager public-keys, we will
only upload to storage servers that have a currently-valid
certificate signed by at least one of the Grid Managers we
accept.
:return: True if we should use this server for uploads, False
otherwise.
"""
# if we have no Grid Manager keys configured, choice is easy
if not self._grid_manager_keys:
return True
# XXX probably want to cache the answer to this? (ignoring
# that for now because certificates expire, so .. slightly
# more complex)
certificates = self.announcements.get("grid-manager-certificates", None)
if not certificates:
return False
for gm_key in self._grid_manager_keys:
for cert in certificates:
if _validate_grid_manager_certificate(gm_key, cert, self._pubkey):
return True
return False
def on_status_changed(self, status_changed):
"""
:param status_changed: a callable taking a single arg (the