mutable: get most of the retrieve-side code written. no tests yet.

This commit is contained in:
Brian Warner 2007-11-06 03:47:29 -07:00
parent a431411418
commit 207888a97b

View File

@ -1,9 +1,12 @@
import os, struct, itertools
import os, struct
from itertools import islice
from zope.interface import implements
from twisted.internet import defer
from twisted.python import failure
from foolscap.eventual import eventually
from allmydata.interfaces import IMutableFileNode, IMutableFileURI
from allmydata.util import hashutil, mathutil
from allmydata.util import hashutil, mathutil, idlib
from allmydata.uri import WriteableSSKFileURI
from allmydata.Crypto.Cipher import AES
from allmydata import hashtree, codec
@ -18,8 +21,49 @@ class NeedMoreDataError(Exception):
class UncoordinatedWriteError(Exception):
pass
class CorruptShareError(Exception):
def __init__(self, peerid, shnum, reason):
self.peerid = peerid
self.shnum = shnum
self.reason = reason
def __repr__(self):
# TODO: in some places we use idlib.b2a, in others (foolscap) we use
# stdlib b32encode. Fix this discrepancy.
short_peerid = idlib.b2a(self.peerid)[:8]
return "<CorruptShareError peerid=%s shnum[%d]: %s" % (short_peerid,
self.shnum,
self.reason)
HEADER_LENGTH = struct.calcsize(">BQ32s BBQQ LLLLLQQ")
def unpack_prefix_and_signature(data):
assert len(data) >= HEADER_LENGTH
o = {}
prefix = data[:struct.calcsize(">BQ32s BBQQ")]
(version,
seqnum,
root_hash,
k, N, segsize, datalen,
o['signature'],
o['share_hash_chain'],
o['block_hash_tree'],
o['IV'],
o['share_data'],
o['enc_privkey'],
o['EOF']) = struct.unpack(">BQ32s BBQQ LLLLLQQ",
data[:HEADER_LENGTH])
assert version == 0
if len(data) < o['share_hash_chain']:
raise NeedMoreDataError(o['share_hash_chain'])
pubkey_s = data[HEADER_LENGTH:o['signature']]
signature = data[o['signature']:o['share_hash_chain']]
return (seqnum, root_hash, k, N, segsize, datalen,
pubkey_s, signature, prefix)
def unpack_share(data):
assert len(data) >= HEADER_LENGTH
o = {}
@ -199,6 +243,8 @@ class MutableFileNode:
return self._writekey
def get_readkey(self):
return self._readkey
def get_storage_index(self):
return self._storage_index
def get_privkey(self):
return self._privkey
def get_encprivkey(self):
@ -251,6 +297,258 @@ class MutableFileNode:
class Retrieve:
def __init__(self, filenode):
self._node = filenode
self._contents = None
# if the filenode already has a copy of the pubkey, use it. Otherwise
# we'll grab a copy from the first peer we talk to.
self._pubkey = filenode.get_pubkey()
self._storage_index = filenode.get_storage_index()
def retrieve(self):
"""Retrieve the filenode's current contents. Returns a Deferred that
fires with a string when the contents have been retrieved."""
# 1: make a guess as to how many peers we should send requests to. We
# want to hear from k+EPSILON (k because we have to, EPSILON extra
# because that helps us resist rollback attacks). [TRADEOFF:
# EPSILON>0 means extra work] [TODO: implement EPSILON>0]
# 2: build the permuted peerlist, taking the first k+E peers
# 3: send readv requests to all of them in parallel, asking for the
# first 2KB of data from all shares
# 4: when the first of the responses comes back, extract information:
# 4a: extract the pubkey, hash it, compare against the URI. If this
# check fails, log a WEIRD and ignore the peer.
# 4b: extract the prefix (seqnum, roothash, k, N, segsize, datalength)
# and verify the signature on it. If this is wrong, log a WEIRD
# and ignore the peer. Save the prefix string in a dict that's
# keyed by (seqnum,roothash) and has (prefixstring, sharemap) as
# values. We'll use the prefixstring again later to avoid doing
# multiple signature checks
# 4c: extract the share size (offset of the last byte of sharedata).
# if it is larger than 2k, send new readv requests to pull down
# the extra data
# 4d: if the extracted 'k' is more than we guessed, rebuild a larger
# permuted peerlist and send out more readv requests.
# 5: as additional responses come back, extract the prefix and compare
# against the ones we've already seen. If they match, add the
# peerid to the corresponing sharemap dict
# 6: [TRADEOFF]: if EPSILON==0, when we get k responses for the
# same (seqnum,roothash) key, attempt to reconstruct that data.
# if EPSILON>0, wait for k+EPSILON responses, then attempt to
# reconstruct the most popular version.. If we do not have enough
# shares and there are still requests outstanding, wait. If there
# are not still requests outstanding (todo: configurable), send
# more requests. Never send queries to more than 2*N servers. If
# we've run out of servers, fail.
# 7: if we discover corrupt shares during the reconstruction process,
# remove that share from the sharemap. and start step#6 again.
initial_query_count = 5
self._read_size = 2000
# we might not know how many shares we need yet.
self._required_shares = self._node.get_required_shares()
self._total_shares = self._node.get_total_shares()
self._segsize = None
self._datalength = None
d = defer.succeed(initial_query_count)
d.addCallback(self._choose_initial_peers)
d.addCallback(self._send_initial_requests)
d.addCallback(lambda res: self._contents)
return d
def _choose_initial_peers(self, numqueries):
n = self._node
full_peerlist = n._client.get_permuted_peers(self._storage_index,
include_myself=True)
# _peerlist is a list of (peerid,conn) tuples for peers that are
# worth talking too. This starts with the first numqueries in the
# permuted list. If that's not enough to get us a recoverable
# version, we expand this to include the first 2*total_shares peerids
# (assuming we learn what total_shares is from one of the first
# numqueries peers)
self._peerlist = [(p[1],p[2])
for p in islice(full_peerlist, numqueries)]
# _peerlist_limit is the query limit we used to build this list. If
# we later increase this limit, it may be useful to re-scan the
# permuted list.
self._peerlist_limit = numqueries
return self._peerlist
def _send_initial_requests(self, peerlist):
self._bad_peerids = set()
self._running = True
self._queries_outstanding = set()
self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
self._peer_storage_servers = {}
dl = []
for (permutedid, peerid, conn) in peerlist:
self._queries_outstanding.add(peerid)
self._do_query(conn, peerid, self._storage_index, self._read_size,
self._peer_storage_servers)
# control flow beyond this point: state machine. Receiving responses
# from queries is the input. We might send out more queries, or we
# might produce a result.
d = self._done_deferred = defer.Deferred()
return d
def _do_query(self, conn, peerid, storage_index, readsize,
peer_storage_servers):
self._queries_outstanding.add(peerid)
if peerid in peer_storage_servers:
d = defer.succeed(peer_storage_servers[peerid])
else:
d = conn.callRemote("get_service", "storageserver")
def _got_storageserver(ss):
peer_storage_servers[peerid] = ss
return ss
d.addCallback(_got_storageserver)
d.addCallback(lambda ss: ss.callRemote("readv_slots", [(0, readsize)]))
d.addCallback(self._got_results, peerid, readsize)
d.addErrback(self._query_failed, peerid, (conn, storage_index,
peer_storage_servers))
return d
def _deserialize_pubkey(self, pubkey_s):
# TODO
return None
def _got_results(self, datavs, peerid, readsize):
self._queries_outstanding.discard(peerid)
self._used_peers.add(peerid)
if not self._running:
return
for shnum,datav in datavs.items():
data = datav[0]
(seqnum, root_hash, k, N, segsize, datalength,
pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
if not self._pubkey:
fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
if fingerprint != self._node._fingerprint:
# bad share
raise CorruptShareError(peerid,
"pubkey doesn't match fingerprint")
self._pubkey = self._deserialize_pubkey(pubkey_s)
verinfo = (seqnum, root_hash)
if verinfo not in self._valid_versions:
# it's a new pair. Verify the signature.
valid = self._pubkey.verify(prefix, signature)
if not valid:
raise CorruptShareError(peerid,
"signature is invalid")
# ok, it's a valid verinfo. Add it to the list of validated
# versions.
self._valid_versions[verinfo] = (prefix, DictOfSets())
# and make a note of the other parameters we've just learned
if self._required_shares is None:
self._required_shares = k
if self._total_shares is None:
self._total_shares = N
if self._segsize is None:
self._segsize = segsize
if self._datalength is None:
self._datalength = datalength
# we've already seen this pair, and checked the signature so we
# know it's a valid candidate. Accumulate the share info, if
# there's enough data present. If not, raise NeedMoreDataError,
# which will trigger a re-fetch.
_ignored = unpack_share(data)
self._valid_versions[verinfo][1].add(shnum, (peerid, data))
self._check_for_done()
def _query_failed(self, f, peerid, stuff):
self._queries_outstanding.discard(peerid)
self._used_peers.add(peerid)
if not self._running:
return
if f.check(NeedMoreDataError):
# ah, just re-send the query then.
self._read_size = max(self._read_size, f.needed_bytes)
(conn, storage_index, peer_storage_servers) = stuff
self._do_query(conn, peerid, storage_index, self._read_size,
peer_storage_servers)
return
self._bad_peerids.add(peerid)
short_sid = idlib.a2b(self.storage_index)[:6]
if f.check(CorruptShareError):
self._node._client.log("WEIRD: bad share for %s: %s" %
(short_sid, f))
else:
self._node._client.log("WEIRD: other error for %s: %s" %
(short_sid, f))
self._check_for_done()
def _check_for_done(self):
share_prefixes = {}
versionmap = DictOfSets()
for prefix, sharemap in self._valid_versions.values():
if len(sharemap) >= self._required_shares:
# this one looks retrievable
try:
contents = self._extract_data(sharemap)
except CorruptShareError:
# log(WEIRD)
# _extract_data is responsible for removing the bad
# share, so we can just try again
return self._check_for_done()
# success!
return self._done(contents)
# we don't have enough shares yet. Should we send out more queries?
if self._queries_outstanding:
# there are some running, so just wait for them to come back.
# TODO: if our initial guess at k was too low, waiting for these
# responses before sending new queries will increase our latency,
# so we could speed things up by sending new requests earlier.
return
# no more queries are outstanding. Can we send out more? First,
# should we be looking at more peers?
if self._total_shares is not None:
search_distance = self._total_shares * 2
else:
search_distance = 20
if self._peerlist_limit < search_distance:
# we might be able to get some more peers from the list
peers = self._node._client.get_permuted_peers(self._storage_index,
include_myself=True)
self._peerlist = [(p[1],p[2])
for p in islice(peers, search_distance)]
self._peerlist_limit = search_distance
# are there any peers on the list that we haven't used?
new_query_peers = []
for (peerid, conn) in self._peerlist:
if peerid not in self._used_peers:
new_query_peers.append( (peerid, conn) )
if len(new_query_peers) > 5:
# only query in batches of 5. TODO: this is pretty
# arbitrary, really I want this to be something like
# k - max(known_version_sharecounts) + some extra
break
if new_query_peers:
for (peerid, conn) in new_query_peers:
self._do_query(conn, peerid,
self._storage_index, self._read_size,
self._peer_storage_servers)
# we'll retrigger when those queries come back
return
# we've used up all the peers we're allowed to search. Failure.
return self._done(failure.Failure(NotEnoughPeersError()))
def _done(self, contents):
self._running = False
eventually(self._done_deferred.callback, contents)
class DictOfSets(dict):
def add(self, key, value):
@ -417,7 +715,7 @@ class Publish:
self._new_root_hash = root_hash
self._new_shares = final_shares
storage_index = self._node._uri.storage_index
storage_index = self._node.get_storage_index()
peerlist = self._node._client.get_permuted_peers(storage_index,
include_myself=False)
# we don't include ourselves in the N peers, but we *do* push an
@ -425,12 +723,13 @@ class Publish:
# the signing key around later. This way, even if all the servers die
# and the directory contents are unrecoverable, at least we can still
# push out a new copy with brand-new contents.
# TODO: actually push this copy
current_share_peers = DictOfSets()
reachable_peers = {}
EPSILON = total_shares / 2
partial_peerlist = itertools.islice(peerlist, total_shares + EPSILON)
partial_peerlist = islice(peerlist, total_shares + EPSILON)
peer_storage_servers = {}
dl = []
for (permutedid, peerid, conn) in partial_peerlist: