tahoe-lafs/src/allmydata/upload.py

321 lines
11 KiB
Python

from zope.interface import implements
from twisted.python import failure, log
from twisted.internet import defer
from twisted.application import service
from foolscap import Referenceable
from allmydata.util import idlib, bencode
from allmydata.util.idlib import peerid_to_short_string as shortid
from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata import codec
from allmydata.uri import pack_uri
from allmydata.interfaces import IUploadable, IUploader
from cStringIO import StringIO
import sha
class NotEnoughPeersError(Exception):
pass
class HaveAllPeersError(Exception):
# we use this to jump out of the loop
pass
# this wants to live in storage, not here
class TooFullError(Exception):
pass
class FileUploader:
debug = False
def __init__(self, peer):
self._peer = peer
def set_params(self, min_shares, target_goodness, max_shares):
self.min_shares = min_shares
self.target_goodness = target_goodness
self.max_shares = max_shares
def set_filehandle(self, filehandle):
self._filehandle = filehandle
filehandle.seek(0, 2)
self._size = filehandle.tell()
filehandle.seek(0)
def set_verifierid(self, vid):
assert isinstance(vid, str)
assert len(vid) == 20
self._verifierid = vid
def start(self):
"""Start uploading the file.
The source of the data to be uploaded must have been set before this
point by calling set_filehandle().
This method returns a Deferred that will fire with the URI (a
string)."""
log.msg("starting upload [%s]" % (idlib.b2a(self._verifierid),))
if self.debug:
print "starting upload"
assert self.min_shares
assert self.target_goodness
# create the encoder, so we can know how large the shares will be
total_shares = self.max_shares
needed_shares = self.min_shares
self._encoder = codec.ReplicatingEncoder()
self._codec_name = self._encoder.get_encoder_type()
self._encoder.set_params(self._size, needed_shares, total_shares)
self._share_size = self._encoder.get_share_size()
# first step: who should we upload to?
# We will talk to at most max_peers (which can be None to mean no
# limit). Maybe limit max_peers to 2*len(self.shares), to reduce
# memory footprint. For now, make it unlimited.
max_peers = None
self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
self.peers_who_said_yes = []
self.peers_who_said_no = []
self.peers_who_had_errors = []
self._total_peers = len(self.permuted)
for p in self.permuted:
assert isinstance(p, str)
# we will shrink self.permuted as we give up on peers
d = defer.maybeDeferred(self._find_peers)
d.addCallback(self._got_enough_peers)
d.addCallback(self._compute_uri)
return d
def _compute_uri(self, params):
return pack_uri(self._codec_name, params, self._verifierid)
def _build_not_enough_peers_error(self):
yes = ",".join([shortid(p) for p in self.peers_who_said_yes])
no = ",".join([shortid(p) for p in self.peers_who_said_no])
err = ",".join([shortid(p) for p in self.peers_who_had_errors])
msg = ("%s goodness, want %s, have %d "
"landlords, %d total peers, "
"peers:yes=%s;no=%s;err=%s" %
(self.goodness_points, self.target_goodness,
len(self.landlords), self._total_peers,
yes, no, err))
return msg
def _find_peers(self):
# this returns a Deferred which fires (with a meaningless value) when
# enough peers are found, or errbacks with a NotEnoughPeersError if
# not.
self.peer_index = 0
self.goodness_points = 0
self.landlords = [] # list of (peerid, bucket_num, remotebucket)
return self._check_next_peer()
def _check_next_peer(self):
if self.debug:
log.msg("FileUploader._check_next_peer: %d permuted, %d goodness"
" (want %d), have %d landlords, %d total peers" %
(len(self.permuted), self.goodness_points,
self.target_goodness, len(self.landlords),
self._total_peers))
if (self.goodness_points >= self.target_goodness and
len(self.landlords) >= self.min_shares):
if self.debug: print " we're done!"
return "done"
if not self.permuted:
# we've run out of peers to check without finding enough, which
# means we won't be able to upload this file. Bummer.
msg = self._build_not_enough_peers_error()
log.msg("NotEnoughPeersError: %s" % msg)
raise NotEnoughPeersError(msg)
# otherwise we use self.peer_index to rotate through all the usable
# peers. It gets inremented elsewhere, but wrapped here.
if self.peer_index >= len(self.permuted):
self.peer_index = 0
peerid = self.permuted[self.peer_index]
d = self._check_peer(peerid)
d.addCallback(lambda res: self._check_next_peer())
return d
def _check_peer(self, peerid):
# contact a single peer, and ask them to hold a share. If they say
# yes, we update self.landlords and self.goodness_points, and
# increment self.peer_index. If they say no, or are uncontactable, we
# remove them from self.permuted. This returns a Deferred which never
# errbacks.
bucket_num = len(self.landlords)
d = self._peer.get_remote_service(peerid, "storageserver")
def _got_peer(service):
if self.debug: print "asking %s" % shortid(peerid)
d2 = service.callRemote("allocate_bucket",
verifierid=self._verifierid,
bucket_num=bucket_num,
size=self._share_size,
leaser=self._peer.nodeid,
canary=Referenceable())
return d2
d.addCallback(_got_peer)
def _allocate_response(bucket):
if self.debug:
print " peerid %s will grant us a lease" % shortid(peerid)
self.peers_who_said_yes.append(peerid)
self.landlords.append( (peerid, bucket_num, bucket) )
self.goodness_points += 1
self.peer_index += 1
d.addCallback(_allocate_response)
def _err(f):
if self.debug: print "err from peer %s:" % idlib.b2a(peerid)
assert isinstance(f, failure.Failure)
if f.check(TooFullError):
if self.debug: print " too full"
self.peers_who_said_no.append(peerid)
elif f.check(IndexError):
if self.debug: print " no connection"
self.peers_who_had_errors.append(peerid)
else:
if self.debug: print " other error:", f
self.peers_who_had_errors.append(peerid)
log.msg("FileUploader._check_peer(%s): err" % shortid(peerid))
log.msg(f)
self.permuted.remove(peerid) # this peer was unusable
return None
d.addErrback(_err)
return d
def _got_enough_peers(self, res):
landlords = self.landlords
if self.debug:
log.msg("FileUploader._got_enough_peers")
log.msg(" %d landlords" % len(landlords))
if len(landlords) < 20:
log.msg(" peerids: %s" % " ".join([idlib.b2a(l[0])
for l in landlords]))
log.msg(" buckets: %s" % " ".join([str(l[1])
for l in landlords]))
# assign shares to landlords
self.sharemap = {}
for peerid, bucket_num, bucket in landlords:
self.sharemap[bucket_num] = bucket
# the sharemap should have exactly len(landlords) shares, with
# no holes
assert sorted(self.sharemap.keys()) == range(len(landlords))
# encode all the data at once: this class does not use segmentation
data = self._filehandle.read()
d = self._encoder.encode(data, self.sharemap.keys())
d.addCallback(self._send_all_shares)
d.addCallback(lambda res: self._encoder.get_serialized_params())
return d
def _send_one_share(self, bucket, sharedata, metadata):
d = bucket.callRemote("write", sharedata)
d.addCallback(lambda res:
bucket.callRemote("set_metadata", metadata))
d.addCallback(lambda res:
bucket.callRemote("close"))
return d
def _send_all_shares(self, (shares, shareids)):
dl = []
for (shareid, share) in zip(shareids, shares):
if self.debug:
log.msg(" writing share %d" % shareid)
metadata = bencode.bencode(shareid)
assert len(share) == self._share_size
assert isinstance(share, str)
bucket = self.sharemap[shareid]
d = self._send_one_share(bucket, share, metadata)
dl.append(d)
return DeferredListShouldSucceed(dl)
def netstring(s):
return "%d:%s," % (len(s), s)
class FileName:
implements(IUploadable)
def __init__(self, filename):
self._filename = filename
def get_filehandle(self):
return open(self._filename, "rb")
def close_filehandle(self, f):
f.close()
class Data:
implements(IUploadable)
def __init__(self, data):
self._data = data
def get_filehandle(self):
return StringIO(self._data)
def close_filehandle(self, f):
pass
class FileHandle:
implements(IUploadable)
def __init__(self, filehandle):
self._filehandle = filehandle
def get_filehandle(self):
return self._filehandle
def close_filehandle(self, f):
# the originator of the filehandle reserves the right to close it
pass
class Uploader(service.MultiService):
"""I am a service that allows file uploading.
"""
implements(IUploader)
name = "uploader"
uploader_class = FileUploader
debug = False
def _compute_verifierid(self, f):
hasher = sha.new(netstring("allmydata_v1_verifierid"))
f.seek(0)
data = f.read()
hasher.update(data)#f.read())
f.seek(0)
# note: this is only of the plaintext data, no encryption yet
return hasher.digest()
def upload(self, f):
# this returns the URI
assert self.parent
assert self.running
f = IUploadable(f)
fh = f.get_filehandle()
u = self.uploader_class(self.parent)
if self.debug:
u.debug = True
u.set_filehandle(fh)
# push two shares, require that we get two back. TODO: this is
# temporary, of course.
u.set_params(2, 2, 4)
u.set_verifierid(self._compute_verifierid(fh))
d = u.start()
def _done(res):
f.close_filehandle(fh)
return res
d.addBoth(_done)
return d
# utility functions
def upload_data(self, data):
return self.upload(Data(data))
def upload_filename(self, filename):
return self.upload(FileName(filename))
def upload_filehandle(self, filehandle):
return self.upload(FileHandle(filehandle))