mutable/servermap: Rework the servermap to work with MDMF mutable files

This commit is contained in:
Kevan Carstensen 2011-08-06 17:42:59 -07:00
parent cde17ec041
commit bb10d685ed

View File

@ -4,17 +4,17 @@ from zope.interface import implements
from itertools import count
from twisted.internet import defer
from twisted.python import failure
from foolscap.api import DeadReferenceError, RemoteException, eventually
from allmydata.util import base32, hashutil, idlib, log
from foolscap.api import DeadReferenceError, RemoteException, eventually, \
fireEventually
from allmydata.util import base32, hashutil, idlib, log, deferredutil
from allmydata.util.dictutil import DictOfSets
from allmydata.storage.server import si_b2a
from allmydata.interfaces import IServermapUpdaterStatus
from pycryptopp.publickey import rsa
from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
CorruptShareError, NeedMoreDataError
from allmydata.mutable.layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
SIGNED_PREFIX_LENGTH
CorruptShareError
from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
class UpdateStatus:
implements(IServermapUpdaterStatus)
@ -121,6 +121,7 @@ class ServerMap:
self.bad_shares = {} # maps (peerid,shnum) to old checkstring
self.last_update_mode = None
self.last_update_time = 0
self.update_data = {} # (verinfo,shnum) => data
def copy(self):
s = ServerMap()
@ -251,7 +252,6 @@ class ServerMap:
"""Return a set of versionids, one for each version that is currently
recoverable."""
versionmap = self.make_versionmap()
recoverable_versions = set()
for (verinfo, shares) in versionmap.items():
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
@ -337,9 +337,25 @@ class ServerMap:
return False
def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
"""
I return the update data for the given shnum
"""
update_data = self.update_data[shnum]
update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
return update_datum
def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
"""
I record the block hash tree for the given shnum.
"""
self.update_data.setdefault(shnum , []).append((verinfo, data))
class ServermapUpdater:
def __init__(self, filenode, storage_broker, monitor, servermap,
mode=MODE_READ, add_lease=False):
mode=MODE_READ, add_lease=False, update_range=None):
"""I update a servermap, locating a sufficient number of useful
shares and remembering where they are located.
@ -364,6 +380,7 @@ class ServermapUpdater:
self._servers_responded = set()
# how much data should we read?
# SDMF:
# * if we only need the checkstring, then [0:75]
# * if we need to validate the checkstring sig, then [543ish:799ish]
# * if we need the verification key, then [107:436ish]
@ -371,21 +388,38 @@ class ServermapUpdater:
# * if we need the encrypted private key, we want [-1216ish:]
# * but we can't read from negative offsets
# * the offset table tells us the 'ish', also the positive offset
# A future version of the SMDF slot format should consider using
# fixed-size slots so we can retrieve less data. For now, we'll just
# read 4000 bytes, which also happens to read enough actual data to
# pre-fetch an 18-entry dirnode.
# MDMF:
# * Checkstring? [0:72]
# * If we want to validate the checkstring, then [0:72], [143:?] --
# the offset table will tell us for sure.
# * If we need the verification key, we have to consult the offset
# table as well.
# At this point, we don't know which we are. Our filenode can
# tell us, but it might be lying -- in some cases, we're
# responsible for telling it which kind of file it is.
self._read_size = 4000
if mode == MODE_CHECK:
# we use unpack_prefix_and_signature, so we need 1k
self._read_size = 1000
self._need_privkey = False
if mode == MODE_WRITE and not self._node.get_privkey():
self._need_privkey = True
# check+repair: repair requires the privkey, so if we didn't happen
# to ask for it during the check, we'll have problems doing the
# publish.
self.fetch_update_data = False
if mode == MODE_WRITE and update_range:
# We're updating the servermap in preparation for an
# in-place file update, so we need to fetch some additional
# data from each share that we find.
assert len(update_range) == 2
self.start_segment = update_range[0]
self.end_segment = update_range[1]
self.fetch_update_data = True
prefix = si_b2a(self._storage_index)[:5]
self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
si=prefix, mode=mode)
@ -424,6 +458,7 @@ class ServermapUpdater:
self._queries_completed = 0
sb = self._storage_broker
# All of the peers, permuted by the storage index, as usual.
full_peerlist = [(s.get_serverid(), s.get_rref())
for s in sb.get_servers_for_psi(self._storage_index)]
self.full_peerlist = full_peerlist # for use later, immutable
@ -431,8 +466,11 @@ class ServermapUpdater:
self._good_peers = set() # peers who had some shares
self._empty_peers = set() # peers who don't have any shares
self._bad_peers = set() # peers to whom our queries failed
self._readers = {} # peerid -> dict(sharewriters), filled in
# after responses come in.
k = self._node.get_required_shares()
# For what cases can these conditions work?
if k is None:
# make a guess
k = 3
@ -445,6 +483,7 @@ class ServermapUpdater:
self.num_peers_to_query = k + self.EPSILON
if self.mode == MODE_CHECK:
# We want to query all of the peers.
initial_peers_to_query = dict(full_peerlist)
must_query = set(initial_peers_to_query.keys())
self.extra_peers = []
@ -452,6 +491,7 @@ class ServermapUpdater:
# we're planning to replace all the shares, so we want a good
# chance of finding them all. We will keep searching until we've
# seen epsilon that don't have a share.
# We don't query all of the peers because that could take a while.
self.num_peers_to_query = N + self.EPSILON
initial_peers_to_query, must_query = self._build_initial_querylist()
self.required_num_empty_peers = self.EPSILON
@ -461,7 +501,8 @@ class ServermapUpdater:
# might also avoid the round trip required to read the encrypted
# private key.
else:
else: # MODE_READ, MODE_ANYTHING
# 2k peers is good enough.
initial_peers_to_query, must_query = self._build_initial_querylist()
# this is a set of peers that we are required to get responses from:
@ -476,6 +517,9 @@ class ServermapUpdater:
# before we can consider ourselves finished, and self.extra_peers
# contains the overflow (peers that we should tap if we don't get
# enough responses)
# I guess that self._must_query is a subset of
# initial_peers_to_query?
assert set(must_query).issubset(set(initial_peers_to_query))
self._send_initial_requests(initial_peers_to_query)
self._status.timings["initial_queries"] = time.time() - self._started
@ -532,8 +576,8 @@ class ServermapUpdater:
# errors that aren't handled by _query_failed (and errors caused by
# _query_failed) get logged, but we still want to check for doneness.
d.addErrback(log.err)
d.addBoth(self._check_for_done)
d.addErrback(self._fatal_error)
d.addCallback(self._check_for_done)
return d
def _do_read(self, ss, peerid, storage_index, shnums, readv):
@ -552,20 +596,59 @@ class ServermapUpdater:
d = ss.callRemote("slot_readv", storage_index, shnums, readv)
return d
def _got_corrupt_share(self, e, shnum, peerid, data, lp):
"""
I am called when a remote server returns a corrupt share in
response to one of our queries. By corrupt, I mean a share
without a valid signature. I then record the failure, notify the
server of the corruption, and record the share as bad.
"""
f = failure.Failure(e)
self.log(format="bad share: %(f_value)s", f_value=str(f),
failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
# Notify the server that its share is corrupt.
self.notify_server_corruption(peerid, shnum, str(e))
# By flagging this as a bad peer, we won't count any of
# the other shares on that peer as valid, though if we
# happen to find a valid version string amongst those
# shares, we'll keep track of it so that we don't need
# to validate the signature on those again.
self._bad_peers.add(peerid)
self._last_failure = f
# XXX: Use the reader for this?
checkstring = data[:SIGNED_PREFIX_LENGTH]
self._servermap.mark_bad_share(peerid, shnum, checkstring)
self._servermap.problems.append(f)
def _cache_good_sharedata(self, verinfo, shnum, now, data):
"""
If one of my queries returns successfully (which means that we
were able to and successfully did validate the signature), I
cache the data that we initially fetched from the storage
server. This will help reduce the number of roundtrips that need
to occur when the file is downloaded, or when the file is
updated.
"""
if verinfo:
self._node._add_to_cache(verinfo, shnum, 0, data)
def _got_results(self, datavs, peerid, readsize, stuff, started):
lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
peerid=idlib.shortnodeid_b2a(peerid),
numshares=len(datavs),
level=log.NOISY)
numshares=len(datavs))
now = time.time()
elapsed = now - started
self._queries_outstanding.discard(peerid)
self._servermap.reachable_peers.add(peerid)
self._must_query.discard(peerid)
self._queries_completed += 1
def _done_processing(ignored=None):
self._queries_outstanding.discard(peerid)
self._servermap.reachable_peers.add(peerid)
self._must_query.discard(peerid)
self._queries_completed += 1
if not self._running:
self.log("but we're not running, so we'll ignore it", parent=lp,
level=log.NOISY)
self.log("but we're not running, so we'll ignore it", parent=lp)
_done_processing()
self._status.add_per_server_time(peerid, "late", started, elapsed)
return
self._status.add_per_server_time(peerid, "query", started, elapsed)
@ -575,107 +658,209 @@ class ServermapUpdater:
else:
self._empty_peers.add(peerid)
last_verinfo = None
last_shnum = None
ss, storage_index = stuff
ds = []
for shnum,datav in datavs.items():
data = datav[0]
try:
verinfo = self._got_results_one_share(shnum, data, peerid, lp)
last_verinfo = verinfo
last_shnum = shnum
self._node._add_to_cache(verinfo, shnum, 0, data)
except CorruptShareError, e:
# log it and give the other shares a chance to be processed
f = failure.Failure()
self.log(format="bad share: %(f_value)s", f_value=str(f.value),
failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
self.notify_server_corruption(peerid, shnum, str(e))
self._bad_peers.add(peerid)
self._last_failure = f
checkstring = data[:SIGNED_PREFIX_LENGTH]
self._servermap.mark_bad_share(peerid, shnum, checkstring)
self._servermap.problems.append(f)
pass
reader = MDMFSlotReadProxy(ss,
storage_index,
shnum,
data)
self._readers.setdefault(peerid, dict())[shnum] = reader
# our goal, with each response, is to validate the version
# information and share data as best we can at this point --
# we do this by validating the signature. To do this, we
# need to do the following:
# - If we don't already have the public key, fetch the
# public key. We use this to validate the signature.
if not self._node.get_pubkey():
# fetch and set the public key.
d = reader.get_verification_key(queue=True)
d.addCallback(lambda results, shnum=shnum, peerid=peerid:
self._try_to_set_pubkey(results, peerid, shnum, lp))
# XXX: Make self._pubkey_query_failed?
d.addErrback(lambda error, shnum=shnum, peerid=peerid:
self._got_corrupt_share(error, shnum, peerid, data, lp))
else:
# we already have the public key.
d = defer.succeed(None)
self._status.timings["cumulative_verify"] += (time.time() - now)
# Neither of these two branches return anything of
# consequence, so the first entry in our deferredlist will
# be None.
if self._need_privkey and last_verinfo:
# send them a request for the privkey. We send one request per
# server.
lp2 = self.log("sending privkey request",
parent=lp, level=log.NOISY)
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = last_verinfo
o = dict(offsets_tuple)
# - Next, we need the version information. We almost
# certainly got this by reading the first thousand or so
# bytes of the share on the storage server, so we
# shouldn't need to fetch anything at this step.
d2 = reader.get_verinfo()
d2.addErrback(lambda error, shnum=shnum, peerid=peerid:
self._got_corrupt_share(error, shnum, peerid, data, lp))
# - Next, we need the signature. For an SDMF share, it is
# likely that we fetched this when doing our initial fetch
# to get the version information. In MDMF, this lives at
# the end of the share, so unless the file is quite small,
# we'll need to do a remote fetch to get it.
d3 = reader.get_signature(queue=True)
d3.addErrback(lambda error, shnum=shnum, peerid=peerid:
self._got_corrupt_share(error, shnum, peerid, data, lp))
# Once we have all three of these responses, we can move on
# to validating the signature
self._queries_outstanding.add(peerid)
readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
ss = self._servermap.connections[peerid]
privkey_started = time.time()
d = self._do_read(ss, peerid, self._storage_index,
[last_shnum], readv)
d.addCallback(self._got_privkey_results, peerid, last_shnum,
privkey_started, lp2)
d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
d.addErrback(log.err)
d.addCallback(self._check_for_done)
d.addErrback(self._fatal_error)
# Does the node already have a privkey? If not, we'll try to
# fetch it here.
if self._need_privkey:
d4 = reader.get_encprivkey(queue=True)
d4.addCallback(lambda results, shnum=shnum, peerid=peerid:
self._try_to_validate_privkey(results, peerid, shnum, lp))
d4.addErrback(lambda error, shnum=shnum, peerid=peerid:
self._privkey_query_failed(error, shnum, data, lp))
else:
d4 = defer.succeed(None)
if self.fetch_update_data:
# fetch the block hash tree and first + last segment, as
# configured earlier.
# Then set them in wherever we happen to want to set
# them.
ds = []
# XXX: We do this above, too. Is there a good way to
# make the two routines share the value without
# introducing more roundtrips?
ds.append(reader.get_verinfo())
ds.append(reader.get_blockhashes(queue=True))
ds.append(reader.get_block_and_salt(self.start_segment,
queue=True))
ds.append(reader.get_block_and_salt(self.end_segment,
queue=True))
d5 = deferredutil.gatherResults(ds)
d5.addCallback(self._got_update_results_one_share, shnum)
else:
d5 = defer.succeed(None)
dl = defer.DeferredList([d, d2, d3, d4, d5])
dl.addBoth(self._turn_barrier)
reader.flush()
dl.addCallback(lambda results, shnum=shnum, peerid=peerid:
self._got_signature_one_share(results, shnum, peerid, lp))
dl.addErrback(lambda error, shnum=shnum, data=data:
self._got_corrupt_share(error, shnum, peerid, data, lp))
dl.addCallback(lambda verinfo, shnum=shnum, peerid=peerid, data=data:
self._cache_good_sharedata(verinfo, shnum, now, data))
ds.append(dl)
# dl is a deferred list that will fire when all of the shares
# that we found on this peer are done processing. When dl fires,
# we know that processing is done, so we can decrement the
# semaphore-like thing that we incremented earlier.
dl = defer.DeferredList(ds, fireOnOneErrback=True)
# Are we done? Done means that there are no more queries to
# send, that there are no outstanding queries, and that we
# haven't received any queries that are still processing. If we
# are done, self._check_for_done will cause the done deferred
# that we returned to our caller to fire, which tells them that
# they have a complete servermap, and that we won't be touching
# the servermap anymore.
dl.addCallback(_done_processing)
dl.addCallback(self._check_for_done)
dl.addErrback(self._fatal_error)
# all done!
self.log("_got_results done", parent=lp, level=log.NOISY)
return dl
def _turn_barrier(self, result):
"""
I help the servermap updater avoid the recursion limit issues
discussed in #237.
"""
return fireEventually(result)
def _try_to_set_pubkey(self, pubkey_s, peerid, shnum, lp):
if self._node.get_pubkey():
return # don't go through this again if we don't have to
fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
assert len(fingerprint) == 32
if fingerprint != self._node.get_fingerprint():
raise CorruptShareError(peerid, shnum,
"pubkey doesn't match fingerprint")
self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
assert self._node.get_pubkey()
def notify_server_corruption(self, peerid, shnum, reason):
ss = self._servermap.connections[peerid]
ss.callRemoteOnly("advise_corrupt_share",
"mutable", self._storage_index, shnum, reason)
def _got_results_one_share(self, shnum, data, peerid, lp):
def _got_signature_one_share(self, results, shnum, peerid, lp):
# It is our job to give versioninfo to our caller. We need to
# raise CorruptShareError if the share is corrupt for any
# reason, something that our caller will handle.
self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
shnum=shnum,
peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY,
parent=lp)
if not self._running:
# We can't process the results, since we can't touch the
# servermap anymore.
self.log("but we're not running anymore.")
return None
# this might raise NeedMoreDataError, if the pubkey and signature
# live at some weird offset. That shouldn't happen, so I'm going to
# treat it as a bad share.
(seqnum, root_hash, IV, k, N, segsize, datalength,
pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
if not self._node.get_pubkey():
fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
assert len(fingerprint) == 32
if fingerprint != self._node.get_fingerprint():
raise CorruptShareError(peerid, shnum,
"pubkey doesn't match fingerprint")
self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
if self._need_privkey:
self._try_to_extract_privkey(data, peerid, shnum, lp)
(ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
ig_segsize, ig_datalen, offsets) = unpack_header(data)
_, verinfo, signature, __, ___ = results
(seqnum,
root_hash,
saltish,
segsize,
datalen,
k,
n,
prefix,
offsets) = verinfo[1]
offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
# XXX: This should be done for us in the method, so
# presumably you can go in there and fix it.
verinfo = (seqnum,
root_hash,
saltish,
segsize,
datalen,
k,
n,
prefix,
offsets_tuple)
# This tuple uniquely identifies a share on the grid; we use it
# to keep track of the ones that we've already seen.
if verinfo not in self._valid_versions:
# it's a new pair. Verify the signature.
valid = self._node.get_pubkey().verify(prefix, signature)
# This is a new version tuple, and we need to validate it
# against the public key before keeping track of it.
assert self._node.get_pubkey()
valid = self._node.get_pubkey().verify(prefix, signature[1])
if not valid:
raise CorruptShareError(peerid, shnum, "signature is invalid")
raise CorruptShareError(peerid, shnum,
"signature is invalid")
# ok, it's a valid verinfo. Add it to the list of validated
# versions.
self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
% (seqnum, base32.b2a(root_hash)[:4],
idlib.shortnodeid_b2a(peerid), shnum,
k, N, segsize, datalength),
parent=lp)
self._valid_versions.add(verinfo)
# We now know that this is a valid candidate verinfo.
# ok, it's a valid verinfo. Add it to the list of validated
# versions.
self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
% (seqnum, base32.b2a(root_hash)[:4],
idlib.shortnodeid_b2a(peerid), shnum,
k, n, segsize, datalen),
parent=lp)
self._valid_versions.add(verinfo)
# We now know that this is a valid candidate verinfo. Whether or
# not this instance of it is valid is a matter for the next
# statement; at this point, we just know that if we see this
# version info again, that its signature checks out and that
# we're okay to skip the signature-checking step.
# (peerid, shnum) are bound in the method invocation.
if (peerid, shnum) in self._servermap.bad_shares:
# we've been told that the rest of the data in this share is
# unusable, so don't add it to the servermap.
@ -688,43 +873,56 @@ class ServermapUpdater:
self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
# and the versionmap
self.versionmap.add(verinfo, (shnum, peerid, timestamp))
return verinfo
def _got_update_results_one_share(self, results, share):
"""
I record the update results in results.
"""
assert len(results) == 4
verinfo, blockhashes, start, end = results
(seqnum,
root_hash,
saltish,
segsize,
datalen,
k,
n,
prefix,
offsets) = verinfo
offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
# XXX: This should be done for us in the method, so
# presumably you can go in there and fix it.
verinfo = (seqnum,
root_hash,
saltish,
segsize,
datalen,
k,
n,
prefix,
offsets_tuple)
update_data = (blockhashes, start, end)
self._servermap.set_update_data_for_share_and_verinfo(share,
verinfo,
update_data)
def _deserialize_pubkey(self, pubkey_s):
verifier = rsa.create_verifying_key_from_string(pubkey_s)
return verifier
def _try_to_extract_privkey(self, data, peerid, shnum, lp):
try:
r = unpack_share(data)
except NeedMoreDataError, e:
# this share won't help us. oh well.
offset = e.encprivkey_offset
length = e.encprivkey_length
self.log("shnum %d on peerid %s: share was too short (%dB) "
"to get the encprivkey; [%d:%d] ought to hold it" %
(shnum, idlib.shortnodeid_b2a(peerid), len(data),
offset, offset+length),
parent=lp)
# NOTE: if uncoordinated writes are taking place, someone might
# change the share (and most probably move the encprivkey) before
# we get a chance to do one of these reads and fetch it. This
# will cause us to see a NotEnoughSharesError(unable to fetch
# privkey) instead of an UncoordinatedWriteError . This is a
# nuisance, but it will go away when we move to DSA-based mutable
# files (since the privkey will be small enough to fit in the
# write cap).
return
(seqnum, root_hash, IV, k, N, segsize, datalen,
pubkey, signature, share_hash_chain, block_hash_tree,
share_data, enc_privkey) = r
return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
"""
Given a writekey from a remote server, I validate it against the
writekey stored in my node. If it is valid, then I set the
privkey and encprivkey properties of the node.
"""
alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
if alleged_writekey != self._node.get_writekey():
@ -797,20 +995,6 @@ class ServermapUpdater:
self._queries_completed += 1
self._last_failure = f
def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
now = time.time()
elapsed = now - started
self._status.add_per_server_time(peerid, "privkey", started, elapsed)
self._queries_outstanding.discard(peerid)
if not self._need_privkey:
return
if shnum not in datavs:
self.log("privkey wasn't there when we asked it",
level=log.WEIRD, umid="VA9uDQ")
return
datav = datavs[shnum]
enc_privkey = datav[0]
self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
def _privkey_query_failed(self, f, peerid, shnum, lp):
self._queries_outstanding.discard(peerid)
@ -825,12 +1009,12 @@ class ServermapUpdater:
self._servermap.problems.append(f)
self._last_failure = f
def _check_for_done(self, res):
# exit paths:
# return self._send_more_queries(outstanding) : send some more queries
# return self._done() : all done
# return : keep waiting, no new queries
lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
"%(outstanding)d queries outstanding, "
"%(extra)d extra peers available, "
@ -1022,6 +1206,7 @@ class ServermapUpdater:
def _done(self):
if not self._running:
self.log("not running; we're already done")
return
self._running = False
now = time.time()
@ -1036,6 +1221,7 @@ class ServermapUpdater:
self._servermap.last_update_time = self._started
# the servermap will not be touched after this
self.log("servermap: %s" % self._servermap.summarize_versions())
eventually(self._done_deferred.callback, self._servermap)
def _fatal_error(self, f):