tahoe-lafs/src/allmydata/upload.py

467 lines
18 KiB
Python
Raw Normal View History

import os
from zope.interface import implements
2007-03-30 03:19:52 +00:00
from twisted.python import log
2006-12-01 09:54:28 +00:00
from twisted.internet import defer
from twisted.application import service
from foolscap import Referenceable
from allmydata.util import idlib, hashutil
from allmydata import encode, storage, hashtree, uri
from allmydata.interfaces import IUploadable, IUploader
2006-12-01 09:54:28 +00:00
2006-12-03 03:31:43 +00:00
from cStringIO import StringIO
import collections, random
2006-12-03 03:31:43 +00:00
2006-12-01 09:54:28 +00:00
class HaveAllPeersError(Exception):
# we use this to jump out of the loop
pass
# this wants to live in storage, not here
class TooFullError(Exception):
pass
2007-07-13 22:09:01 +00:00
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
# few places). Ask for a little bit more just in case we need it. If
# the extension changes size, we can change EXTENSION_SIZE to
# allocate a more accurate amount of space.
EXTENSION_SIZE = 1000
2007-03-30 03:19:52 +00:00
class PeerTracker:
def __init__(self, peerid, permutedid, connection,
sharesize, blocksize, num_segments, num_share_hashes,
crypttext_hash):
2007-03-30 03:19:52 +00:00
self.peerid = peerid
2007-03-30 21:54:33 +00:00
self.permutedid = permutedid
self.connection = connection # to an RIClient
2007-03-30 03:19:52 +00:00
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
2007-07-13 22:09:01 +00:00
#print "PeerTracker", peerid, permutedid, sharesize
as = storage.allocated_size(sharesize,
num_segments,
num_share_hashes,
EXTENSION_SIZE)
2007-07-13 22:09:01 +00:00
self.allocated_size = as
2007-03-30 03:19:52 +00:00
self.blocksize = blocksize
self.num_segments = num_segments
self.num_share_hashes = num_share_hashes
self.crypttext_hash = crypttext_hash
self._storageserver = None
2007-03-30 03:19:52 +00:00
def query(self, sharenums):
if not self._storageserver:
d = self.connection.callRemote("get_service", "storageserver")
d.addCallback(self._got_storageserver)
d.addCallback(lambda res: self._query(sharenums))
return d
return self._query(sharenums)
def _got_storageserver(self, storageserver):
self._storageserver = storageserver
def _query(self, sharenums):
2007-07-13 22:09:01 +00:00
#print " query", self.peerid, len(sharenums)
d = self._storageserver.callRemote("allocate_buckets",
self.crypttext_hash,
2007-07-13 22:09:01 +00:00
sharenums,
self.allocated_size,
canary=Referenceable())
2007-03-30 03:19:52 +00:00
d.addCallback(self._got_reply)
return d
def _got_reply(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
b = {}
for sharenum, rref in buckets.iteritems():
bp = storage.WriteBucketProxy(rref, self.sharesize,
self.blocksize,
self.num_segments,
self.num_share_hashes,
EXTENSION_SIZE)
b[sharenum] = bp
self.buckets.update(b)
return (alreadygot, set(b.keys()))
class Tahoe3PeerSelector:
def get_shareholders(self, client,
storage_index, share_size, block_size,
num_segments, total_shares, shares_of_happiness):
2007-06-12 02:21:51 +00:00
"""
@return: a set of PeerTracker instances that have agreed to hold some
shares for us
"""
self.total_shares = total_shares
self.shares_of_happiness = shares_of_happiness
# we are responsible for locating the shareholders. self._encoder is
# responsible for handling the data and sending out the shares.
peers = client.get_permuted_peers(storage_index)
2007-03-30 03:19:52 +00:00
assert peers
2007-07-13 22:09:01 +00:00
# this needed_hashes computation should mirror
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
# (instead of a HashTree) because we don't require actual hashing
# just to count the levels.
ht = hashtree.IncompleteHashTree(total_shares)
2007-07-13 22:09:01 +00:00
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
trackers = [ PeerTracker(peerid, permutedid, conn,
share_size, block_size,
num_segments, num_share_hashes,
storage_index)
2007-03-30 03:19:52 +00:00
for permutedid, peerid, conn in peers ]
2007-03-30 21:54:33 +00:00
self.usable_peers = set(trackers) # this set shrinks over time
self.used_peers = set() # while this set grows
self.unallocated_sharenums = set(range(total_shares)) # this one shrinks
2007-03-30 03:19:52 +00:00
2007-03-30 21:54:33 +00:00
return self._locate_more_shareholders()
def _locate_more_shareholders(self):
2007-03-30 03:19:52 +00:00
d = self._query_peers()
2007-03-30 21:54:33 +00:00
d.addCallback(self._located_some_shareholders)
return d
2007-03-30 21:54:33 +00:00
def _located_some_shareholders(self, res):
log.msg("_located_some_shareholders")
log.msg(" still need homes for %d shares, still have %d usable peers"
% (len(self.unallocated_sharenums), len(self.usable_peers)))
2007-03-30 21:54:33 +00:00
if not self.unallocated_sharenums:
# Finished allocating places for all shares.
log.msg("%s._locate_all_shareholders() "
"Finished allocating places for all shares." % self)
2007-03-30 21:54:33 +00:00
log.msg("used_peers is %s" % (self.used_peers,))
return self.used_peers
if not self.usable_peers:
# Ran out of peers who have space.
log.msg("%s._locate_all_shareholders() "
"Ran out of peers who have space." % self)
margin = self.total_shares - self.shares_of_happiness
if len(self.unallocated_sharenums) < margin:
2007-03-30 21:54:33 +00:00
# But we allocated places for enough shares.
log.msg("%s._locate_all_shareholders() "
"But we allocated places for enough shares.")
2007-03-30 21:54:33 +00:00
return self.used_peers
raise encode.NotEnoughPeersError
2007-03-30 21:54:33 +00:00
# we need to keep trying
return self._locate_more_shareholders()
def _create_ring_of_things(self):
PEER = 1 # must sort later than SHARE, for consistency with download
SHARE = 0
# ring_of_things is a list of (position_in_ring, whatami, x) where
# whatami is SHARE if x is a sharenum or else PEER if x is a
# PeerTracker instance
ring_of_things = []
2007-03-30 21:54:33 +00:00
ring_of_things.extend([ (peer.permutedid, PEER, peer,)
for peer in self.usable_peers ])
shares = [ (i * 2**160 / self.total_shares, SHARE, i)
for i in self.unallocated_sharenums]
ring_of_things.extend(shares)
ring_of_things.sort()
ring_of_things = collections.deque(ring_of_things)
return ring_of_things
2007-03-30 03:19:52 +00:00
def _query_peers(self):
"""
@return: a deferred that fires when all queries have resolved
"""
2007-03-30 21:54:33 +00:00
PEER = 1
SHARE = 0
ring = self._create_ring_of_things()
2007-03-30 03:19:52 +00:00
# Choose a random starting point, talk to that peer.
2007-03-30 21:54:33 +00:00
ring.rotate(random.randrange(0, len(ring)))
2007-03-30 03:19:52 +00:00
# Walk backwards to find a peer. We know that we'll eventually find
# one because we earlier asserted that there was at least one.
2007-03-30 21:54:33 +00:00
while ring[0][1] != PEER:
ring.rotate(-1)
peer = ring[0][2]
2007-03-30 03:19:52 +00:00
assert isinstance(peer, PeerTracker), peer
2007-03-30 21:54:33 +00:00
ring.rotate(-1)
2007-03-30 03:19:52 +00:00
# loop invariant: at the top of the loop, we are always one step to
# the left of a peer, which is stored in the peer variable.
outstanding_queries = []
2007-03-30 21:54:33 +00:00
sharenums_to_query = set()
for i in range(len(ring)):
if ring[0][1] == SHARE:
sharenums_to_query.add(ring[0][2])
else:
2007-07-13 22:09:01 +00:00
if True or sharenums_to_query:
d = peer.query(sharenums_to_query)
d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
outstanding_queries.append(d)
d.addErrback(log.err)
2007-03-30 21:54:33 +00:00
peer = ring[0][2]
sharenums_to_query = set()
ring.rotate(-1)
2007-03-30 03:19:52 +00:00
return defer.DeferredList(outstanding_queries)
def _got_response(self, (alreadygot, allocated), peer, shares_we_requested):
"""
@type alreadygot: a set of sharenums
@type allocated: a set of sharenums
"""
# TODO: some future version of Foolscap might not convert inbound
# sets into sets.Set on us, even when we're using 2.4
alreadygot = set(alreadygot)
allocated = set(allocated)
#log.msg("%s._got_response(%s, %s, %s): "
# "self.unallocated_sharenums: %s, unhandled: %s"
# % (self, (alreadygot, allocated), peer, shares_we_requested,
# self.unallocated_sharenums,
# shares_we_requested - alreadygot - allocated))
2007-03-30 03:19:52 +00:00
self.unallocated_sharenums -= alreadygot
self.unallocated_sharenums -= allocated
if allocated:
self.used_peers.add(peer)
2007-03-30 03:19:52 +00:00
if shares_we_requested - alreadygot - allocated:
# Then he didn't accept some of the shares, so he's full.
#log.msg("%s._got_response(%s, %s, %s): "
# "self.unallocated_sharenums: %s, unhandled: %s HE'S FULL"
# % (self,
# (alreadygot, allocated), peer, shares_we_requested,
# self.unallocated_sharenums,
# shares_we_requested - alreadygot - allocated))
2007-03-30 03:19:52 +00:00
self.usable_peers.remove(peer)
def _got_error(self, f, peer):
2007-03-30 21:54:33 +00:00
log.msg("%s._got_error(%s, %s)" % (self, f, peer,))
self.usable_peers.remove(peer)
2007-03-30 03:19:52 +00:00
class CHKUploader:
peer_selector_class = Tahoe3PeerSelector
def __init__(self, client, uploadable, options={}):
self._client = client
self._uploadable = IUploadable(uploadable)
self._options = options
def set_params(self, encoding_parameters):
self._encoding_parameters = encoding_parameters
needed_shares, shares_of_happiness, total_shares = encoding_parameters
self.needed_shares = needed_shares
self.shares_of_happiness = shares_of_happiness
self.total_shares = total_shares
def start(self):
"""Start uploading the file.
This method returns a Deferred that will fire with the URI (a
string)."""
log.msg("starting upload of %s" % self._uploadable)
d = self._uploadable.get_size()
d.addCallback(self.setup_encoder)
d.addCallback(self._uploadable.get_encryption_key)
d.addCallback(self.setup_keys)
d.addCallback(self.locate_all_shareholders)
d.addCallback(self.set_shareholders)
d.addCallback(lambda res: self._encoder.start())
d.addCallback(self._compute_uri)
return d
def setup_encoder(self, size):
self._size = size
self._encoder = encode.Encoder(self._options)
self._encoder.set_size(size)
self._encoder.set_params(self._encoding_parameters)
self._encoder.set_uploadable(self._uploadable)
self._encoder.setup()
return self._encoder.get_serialized_params()
def setup_keys(self, key):
assert isinstance(key, str)
assert len(key) == 16 # AES-128
self._encryption_key = key
self._encoder.set_encryption_key(key)
storage_index = hashutil.storage_index_chk_hash(key)
assert isinstance(storage_index, str)
# There's no point to having the SI be longer than the key, so we
# specify that it is truncated to the same 128 bits as the AES key.
assert len(storage_index) == 16 # SHA-256 truncated to 128b
self._storage_index = storage_index
log.msg(" upload storage_index is [%s]" % (idlib.b2a(storage_index,)))
def locate_all_shareholders(self, ignored=None):
peer_selector = self.peer_selector_class()
share_size = self._encoder.get_share_size()
block_size = self._encoder.get_block_size()
num_segments = self._encoder.get_num_segments()
gs = peer_selector.get_shareholders
d = gs(self._client,
self._storage_index, share_size, block_size,
num_segments, self.total_shares, self.shares_of_happiness)
return d
def set_shareholders(self, used_peers):
2007-03-30 21:54:33 +00:00
"""
@param used_peers: a sequence of PeerTracker objects
"""
log.msg("_send_shares, used_peers is %s" % (used_peers,))
for peer in used_peers:
assert isinstance(peer, PeerTracker)
2007-03-30 03:19:52 +00:00
buckets = {}
for peer in used_peers:
buckets.update(peer.buckets)
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
self._encoder.set_shareholders(buckets)
2007-06-08 22:59:16 +00:00
def _compute_uri(self, uri_extension_hash):
u = uri.CHKFileURI(key=self._encryption_key,
uri_extension_hash=uri_extension_hash,
needed_shares=self.needed_shares,
total_shares=self.total_shares,
size=self._size,
)
assert u.storage_index == self._storage_index
return u.to_string()
2006-12-01 09:54:28 +00:00
def read_this_many_bytes(uploadable, size, prepend_data=[]):
if size == 0:
return defer.succeed([])
d = uploadable.read(size)
def _got(data):
assert isinstance(data, list)
bytes = sum([len(piece) for piece in data])
assert bytes > 0
assert bytes <= size
remaining = size - bytes
if remaining:
return read_this_many_bytes(uploadable, remaining,
prepend_data + data)
return prepend_data + data
d.addCallback(_got)
return d
class LiteralUploader:
def __init__(self, client, uploadable, options={}):
self._client = client
self._uploadable = IUploadable(uploadable)
self._options = options
def set_params(self, encoding_parameters):
pass
def start(self):
d = self._uploadable.get_size()
d.addCallback(lambda size: read_this_many_bytes(self._uploadable, size))
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
d.addCallback(lambda u: u.to_string())
return d
def close(self):
pass
class ConvergentUploadMixin:
# to use this, the class it is mixed in to must have a seekable
# filehandle named self._filehandle
def get_encryption_key(self, encoding_parameters):
f = self._filehandle
enckey_hasher = hashutil.key_hasher()
#enckey_hasher.update(encoding_parameters) # TODO
f.seek(0)
BLOCKSIZE = 64*1024
while True:
data = f.read(BLOCKSIZE)
if not data:
break
enckey_hasher.update(data)
enckey = enckey_hasher.digest()[:16]
f.seek(0)
return defer.succeed(enckey)
class NonConvergentUploadMixin:
def get_encryption_key(self, encoding_parameters):
return defer.succeed(os.urandom(16))
class FileHandle(ConvergentUploadMixin):
implements(IUploadable)
def __init__(self, filehandle):
self._filehandle = filehandle
def get_size(self):
self._filehandle.seek(0,2)
size = self._filehandle.tell()
self._filehandle.seek(0)
return defer.succeed(size)
def read(self, length):
return defer.succeed([self._filehandle.read(length)])
def close(self):
# the originator of the filehandle reserves the right to close it
pass
class FileName(FileHandle):
def __init__(self, filename):
FileHandle.__init__(self, open(filename, "rb"))
def close(self):
FileHandle.close(self)
self._filehandle.close()
class Data(FileHandle):
def __init__(self, data):
FileHandle.__init__(self, StringIO(data))
class Uploader(service.MultiService):
"""I am a service that allows file uploading.
"""
implements(IUploader)
name = "uploader"
uploader_class = CHKUploader
URI_LIT_SIZE_THRESHOLD = 55
DEFAULT_ENCODING_PARAMETERS = (25, 75, 100)
# this is a tuple of (needed, desired, total). 'needed' is the number of
# shares required to reconstruct a file. 'desired' means that we will
# abort an upload unless we can allocate space for at least this many.
# 'total' is the total number of shares created by encoding. If everybody
# has room then this is is how many we will upload.
2007-03-30 03:19:52 +00:00
def upload(self, uploadable, options={}):
# this returns the URI
assert self.parent
assert self.running
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
def _got_size(size):
uploader_class = self.uploader_class
if size <= self.URI_LIT_SIZE_THRESHOLD:
uploader_class = LiteralUploader
uploader = uploader_class(self.parent, uploadable, options)
uploader.set_params(self.parent.get_encoding_parameters()
or self.DEFAULT_ENCODING_PARAMETERS)
return uploader.start()
d.addCallback(_got_size)
def _done(res):
uploadable.close()
return res
d.addBoth(_done)
return d
# utility functions
def upload_data(self, data, options={}):
return self.upload(Data(data), options)
def upload_filename(self, filename, options={}):
return self.upload(FileName(filename), options)
def upload_filehandle(self, filehandle, options={}):
return self.upload(FileHandle(filehandle), options)