tahoe-lafs/allmydata/upload.py
2006-12-03 03:01:43 -07:00

166 lines
5.3 KiB
Python

from twisted.python import failure, log
from twisted.internet import defer
from twisted.application import service
from allmydata.util import idlib
from allmydata import encode
from cStringIO import StringIO
import sha
class NotEnoughPeersError(Exception):
pass
class HaveAllPeersError(Exception):
# we use this to jump out of the loop
pass
# this wants to live in storage, not here
class TooFullError(Exception):
pass
class FileUploader:
debug = False
def __init__(self, peer):
self._peer = peer
def set_filehandle(self, filehandle):
self._filehandle = filehandle
filehandle.seek(0, 2)
self._size = filehandle.tell()
filehandle.seek(0)
def make_encoder(self):
self._shares = 4
self._encoder = encode.Encoder(self._filehandle, self._shares)
self._share_size = self._size
def set_verifierid(self, vid):
assert isinstance(vid, str)
self._verifierid = vid
def start(self):
log.msg("starting upload")
if self.debug:
print "starting upload"
# first step: who should we upload to?
# maybe limit max_peers to 2*len(self.shares), to reduce memory
# footprint
max_peers = None
self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
for p in self.permuted:
assert isinstance(p, str)
# we will shrink self.permuted as we give up on peers
self.peer_index = 0
self.goodness_points = 0
self.target_goodness = self._shares
self.landlords = [] # list of (peerid, bucket_num, remotebucket)
d = defer.maybeDeferred(self._check_next_peer)
d.addCallback(self._got_all_peers)
return d
def _check_next_peer(self):
if len(self.permuted) == 0:
# there are no more to check
raise NotEnoughPeersError
if self.peer_index >= len(self.permuted):
self.peer_index = 0
peerid = self.permuted[self.peer_index]
d = self._peer.get_remote_service(peerid, "storageserver")
def _got_peer(service):
bucket_num = len(self.landlords)
if self.debug: print "asking %s" % idlib.b2a(peerid)
d2 = service.callRemote("allocate_bucket",
verifierid=self._verifierid,
bucket_num=bucket_num,
size=self._share_size,
leaser=self._peer.nodeid)
def _allocate_response(bucket):
if self.debug:
print " peerid %s will grant us a lease" % idlib.b2a(peerid)
self.landlords.append( (peerid, bucket_num, bucket) )
self.goodness_points += 1
if self.goodness_points >= self.target_goodness:
if self.debug: print " we're done!"
raise HaveAllPeersError()
# otherwise we fall through to allocate more peers
d2.addCallback(_allocate_response)
return d2
d.addCallback(_got_peer)
def _done_with_peer(res):
if self.debug: print "done with peer %s:" % idlib.b2a(peerid)
if isinstance(res, failure.Failure):
if res.check(HaveAllPeersError):
if self.debug: print " all done"
# we're done!
return
if res.check(TooFullError):
if self.debug: print " too full"
elif res.check(IndexError):
if self.debug: print " no connection"
else:
if self.debug: print " other error:", res
self.permuted.remove(peerid) # this peer was unusable
else:
if self.debug: print " they gave us a lease"
# we get here for either good peers (when we still need
# more), or after checking a bad peer (and thus still need
# more). So now we need to grab a new peer.
self.peer_index += 1
return self._check_next_peer()
d.addBoth(_done_with_peer)
return d
def _got_all_peers(self, res):
d = self._encoder.do_upload(self.landlords)
d.addCallback(lambda res: self._verifierid)
return d
def netstring(s):
return "%d:%s," % (len(s), s)
class Uploader(service.MultiService):
"""I am a service that allows file uploading.
"""
name = "uploader"
def _compute_verifierid(self, f):
hasher = sha.new(netstring("allmydata_v1_verifierid"))
f.seek(0)
hasher.update(f.read())
f.seek(0)
# note: this is only of the plaintext data, no encryption yet
return hasher.digest()
def upload_filename(self, filename):
f = open(filename, "rb")
def _done(res):
f.close()
return res
d = self.upload_filehandle(f)
d.addBoth(_done)
return d
def upload_data(self, data):
f = StringIO(data)
return self.upload_filehandle(f)
def upload_filehandle(self, f):
assert self.parent
assert self.running
u = FileUploader(self.parent)
u.set_filehandle(f)
u.set_verifierid(self._compute_verifierid(f))
u.make_encoder()
d = u.start()
return d