tahoe-lafs/src/allmydata/upload.py

669 lines
26 KiB
Python
Raw Normal View History

import os
from zope.interface import implements
from twisted.python import log, failure
2006-12-01 09:54:28 +00:00
from twisted.internet import defer
from twisted.application import service
from foolscap import Referenceable
from allmydata.util.hashutil import file_renewal_secret_hash, \
file_cancel_secret_hash, bucket_renewal_secret_hash, \
bucket_cancel_secret_hash, plaintext_hasher, \
storage_index_chk_hash, plaintext_segment_hasher, key_hasher
from allmydata import encode, storage, hashtree, uri
from allmydata.util import idlib, mathutil
from allmydata.interfaces import IUploadable, IUploader, IEncryptedUploadable
from allmydata.Crypto.Cipher import AES
2006-12-01 09:54:28 +00:00
2006-12-03 03:31:43 +00:00
from cStringIO import StringIO
2006-12-01 09:54:28 +00:00
class HaveAllPeersError(Exception):
# we use this to jump out of the loop
pass
# this wants to live in storage, not here
class TooFullError(Exception):
pass
2007-07-13 22:09:01 +00:00
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
# few places). Ask for a little bit more just in case we need it. If
# the extension changes size, we can change EXTENSION_SIZE to
# allocate a more accurate amount of space.
EXTENSION_SIZE = 1000
2007-03-30 03:19:52 +00:00
class PeerTracker:
def __init__(self, peerid, permutedid, connection,
sharesize, blocksize, num_segments, num_share_hashes,
storage_index,
bucket_renewal_secret, bucket_cancel_secret):
2007-03-30 03:19:52 +00:00
self.peerid = peerid
2007-03-30 21:54:33 +00:00
self.permutedid = permutedid
self.connection = connection # to an RIClient
2007-03-30 03:19:52 +00:00
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
2007-07-13 22:09:01 +00:00
#print "PeerTracker", peerid, permutedid, sharesize
as = storage.allocated_size(sharesize,
num_segments,
num_share_hashes,
EXTENSION_SIZE)
2007-07-13 22:09:01 +00:00
self.allocated_size = as
2007-03-30 03:19:52 +00:00
self.blocksize = blocksize
self.num_segments = num_segments
self.num_share_hashes = num_share_hashes
self.storage_index = storage_index
self._storageserver = None
2007-03-30 03:19:52 +00:00
self.renew_secret = bucket_renewal_secret
self.cancel_secret = bucket_cancel_secret
def __repr__(self):
return ("<PeerTracker for peer %s and SI %s>"
% (idlib.b2a(self.peerid)[:4],
idlib.b2a(self.storage_index)[:6]))
2007-03-30 03:19:52 +00:00
def query(self, sharenums):
if not self._storageserver:
d = self.connection.callRemote("get_service", "storageserver")
d.addCallback(self._got_storageserver)
d.addCallback(lambda res: self._query(sharenums))
return d
return self._query(sharenums)
def _got_storageserver(self, storageserver):
self._storageserver = storageserver
def _query(self, sharenums):
2007-07-13 22:09:01 +00:00
#print " query", self.peerid, len(sharenums)
d = self._storageserver.callRemote("allocate_buckets",
self.storage_index,
self.renew_secret,
self.cancel_secret,
2007-07-13 22:09:01 +00:00
sharenums,
self.allocated_size,
canary=Referenceable())
2007-03-30 03:19:52 +00:00
d.addCallback(self._got_reply)
return d
2007-03-30 03:19:52 +00:00
def _got_reply(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
b = {}
for sharenum, rref in buckets.iteritems():
bp = storage.WriteBucketProxy(rref, self.sharesize,
self.blocksize,
self.num_segments,
self.num_share_hashes,
EXTENSION_SIZE)
b[sharenum] = bp
self.buckets.update(b)
return (alreadygot, set(b.keys()))
class Tahoe2PeerSelector:
def __init__(self, upload_id):
self.upload_id = upload_id
self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
self.error_count = 0
self.num_peers_contacted = 0
self.last_failure_msg = None
def __repr__(self):
return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
def get_shareholders(self, client,
storage_index, share_size, block_size,
num_segments, total_shares, shares_of_happiness,
push_to_ourselves):
"""
@return: a set of PeerTracker instances that have agreed to hold some
shares for us
"""
self.total_shares = total_shares
self.shares_of_happiness = shares_of_happiness
self.homeless_shares = range(total_shares)
# self.uncontacted_peers = list() # peers we haven't asked yet
self.contacted_peers = [] # peers worth asking again
self.contacted_peers2 = [] # peers that we have asked again
self.use_peers = set() # PeerTrackers that have shares assigned to them
self.preexisting_shares = {} # sharenum -> PeerTracker holding the share
peers = client.get_permuted_peers(storage_index, push_to_ourselves)
if not peers:
raise encode.NotEnoughPeersError("client gave us zero peers")
# figure out how much space to ask for
# this needed_hashes computation should mirror
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
# (instead of a HashTree) because we don't require actual hashing
# just to count the levels.
ht = hashtree.IncompleteHashTree(total_shares)
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
# decide upon the renewal/cancel secrets, to include them in the
# allocat_buckets query.
client_renewal_secret = client.get_renewal_secret()
client_cancel_secret = client.get_cancel_secret()
file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
storage_index)
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
storage_index)
trackers = [ PeerTracker(peerid, permutedid, conn,
share_size, block_size,
num_segments, num_share_hashes,
storage_index,
bucket_renewal_secret_hash(file_renewal_secret,
peerid),
bucket_cancel_secret_hash(file_cancel_secret,
peerid),
)
for permutedid, peerid, conn in peers ]
self.uncontacted_peers = trackers
d = defer.maybeDeferred(self._loop)
return d
def _loop(self):
if not self.homeless_shares:
# all done
msg = ("placed all %d shares, "
"sent %d queries to %d peers, "
"%d queries placed some shares, %d placed none, "
"got %d errors" %
(self.total_shares,
self.query_count, self.num_peers_contacted,
self.good_query_count, self.bad_query_count,
self.error_count))
log.msg("peer selection successful for %s: %s" % (self, msg))
return self.use_peers
if self.uncontacted_peers:
peer = self.uncontacted_peers.pop(0)
# TODO: don't pre-convert all peerids to PeerTrackers
assert isinstance(peer, PeerTracker)
shares_to_ask = set([self.homeless_shares.pop(0)])
self.query_count += 1
self.num_peers_contacted += 1
d = peer.query(shares_to_ask)
d.addBoth(self._got_response, peer, shares_to_ask,
self.contacted_peers)
return d
elif self.contacted_peers:
# ask a peer that we've already asked.
num_shares = mathutil.div_ceil(len(self.homeless_shares),
len(self.contacted_peers))
peer = self.contacted_peers.pop(0)
shares_to_ask = set(self.homeless_shares[:num_shares])
self.homeless_shares[:num_shares] = []
self.query_count += 1
d = peer.query(shares_to_ask)
d.addBoth(self._got_response, peer, shares_to_ask,
self.contacted_peers2)
return d
elif self.contacted_peers2:
# we've finished the second-or-later pass. Move all the remaining
# peers back into self.contacted_peers for the next pass.
self.contacted_peers.extend(self.contacted_peers2)
self.contacted_peers[:] = []
return self._loop()
else:
# no more peers. If we haven't placed enough shares, we fail.
placed_shares = self.total_shares - len(self.homeless_shares)
if placed_shares < self.shares_of_happiness:
msg = ("placed %d shares out of %d total (%d homeless), "
"sent %d queries to %d peers, "
"%d queries placed some shares, %d placed none, "
"got %d errors" %
(self.total_shares - len(self.homeless_shares),
self.total_shares, len(self.homeless_shares),
self.query_count, self.num_peers_contacted,
self.good_query_count, self.bad_query_count,
self.error_count))
msg = "peer selection failed for %s: %s" % (self, msg)
if self.last_failure_msg:
msg += " (%s)" % (self.last_failure_msg,)
log.msg(msg)
raise encode.NotEnoughPeersError(msg)
else:
# we placed enough to be happy, so we're done
return self.use_peers
def _got_response(self, res, peer, shares_to_ask, put_peer_here):
if isinstance(res, failure.Failure):
# This is unusual, and probably indicates a bug or a network
# problem.
log.msg("%s got error during peer selection: %s" % (peer, res))
self.error_count += 1
self.homeless_shares = list(shares_to_ask) + self.homeless_shares
if (self.uncontacted_peers
or self.contacted_peers
or self.contacted_peers2):
# there is still hope, so just loop
pass
else:
# No more peers, so this upload might fail (it depends upon
# whether we've hit shares_of_happiness or not). Log the last
# failure we got: if a coding error causes all peers to fail
# in the same way, this allows the common failure to be seen
# by the uploader and should help with debugging
msg = ("last failure (from %s) was: %s" % (peer, res))
self.last_failure_msg = msg
else:
(alreadygot, allocated) = res
progress = False
for s in alreadygot:
self.preexisting_shares[s] = peer
if s in self.homeless_shares:
self.homeless_shares.remove(s)
progress = True
# the PeerTracker will remember which shares were allocated on
# that peer. We just have to remember to use them.
if allocated:
self.use_peers.add(peer)
progress = True
not_yet_present = set(shares_to_ask) - set(alreadygot)
still_homeless = not_yet_present - set(allocated)
if progress:
# they accepted or already had at least one share, so
# progress has been made
self.good_query_count += 1
else:
self.bad_query_count += 1
if still_homeless:
# In networks with lots of space, this is very unusual and
# probably indicates an error. In networks with peers that
# are full, it is merely unusual. In networks that are very
# full, it is common, and many uploads will fail. In most
# cases, this is obviously not fatal, and we'll just use some
# other peers.
# some shares are still homeless, keep trying to find them a
# home. The ones that were rejected get first priority.
self.homeless_shares = (list(still_homeless)
+ self.homeless_shares)
# Since they were unable to accept all of our requests, so it
# is safe to assume that asking them again won't help.
else:
# if they *were* able to accept everything, they might be
# willing to accept even more.
put_peer_here.append(peer)
# now loop
return self._loop()
class EncryptAnUploadable:
"""This is a wrapper that takes an IUploadable and provides
IEncryptedUploadable."""
implements(IEncryptedUploadable)
def __init__(self, original):
self.original = original
self._encryptor = None
self._plaintext_hasher = plaintext_hasher()
self._plaintext_segment_hasher = None
self._plaintext_segment_hashes = []
self._params = None
def get_size(self):
return self.original.get_size()
def set_serialized_encoding_parameters(self, params):
self._params = params
def _get_encryptor(self):
if self._encryptor:
return defer.succeed(self._encryptor)
if self._params is not None:
self.original.set_serialized_encoding_parameters(self._params)
d = self.original.get_encryption_key()
def _got(key):
e = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
self._encryptor = e
storage_index = storage_index_chk_hash(key)
assert isinstance(storage_index, str)
# There's no point to having the SI be longer than the key, so we
# specify that it is truncated to the same 128 bits as the AES key.
assert len(storage_index) == 16 # SHA-256 truncated to 128b
self._storage_index = storage_index
return e
d.addCallback(_got)
return d
def get_storage_index(self):
d = self._get_encryptor()
d.addCallback(lambda res: self._storage_index)
return d
def set_segment_size(self, segsize):
self._segment_size = segsize
def _get_segment_hasher(self):
p = self._plaintext_segment_hasher
if p:
left = self._segment_size - self._plaintext_segment_hashed_bytes
return p, left
p = plaintext_segment_hasher()
self._plaintext_segment_hasher = p
self._plaintext_segment_hashed_bytes = 0
return p, self._segment_size
def _update_segment_hash(self, chunk):
offset = 0
while offset < len(chunk):
p, segment_left = self._get_segment_hasher()
chunk_left = len(chunk) - offset
this_segment = min(chunk_left, segment_left)
p.update(chunk[offset:offset+this_segment])
self._plaintext_segment_hashed_bytes += this_segment
if self._plaintext_segment_hashed_bytes == self._segment_size:
# we've filled this segment
self._plaintext_segment_hashes.append(p.digest())
self._plaintext_segment_hasher = None
offset += this_segment
def read_encrypted(self, length):
d = self._get_encryptor()
d.addCallback(lambda res: self.original.read(length))
def _got(data):
assert isinstance(data, (tuple, list)), type(data)
data = list(data)
cryptdata = []
# we use data.pop(0) instead of 'for chunk in data' to save
# memory: each chunk is destroyed as soon as we're done with it.
while data:
chunk = data.pop(0)
self._plaintext_hasher.update(chunk)
self._update_segment_hash(chunk)
cryptdata.append(self._encryptor.encrypt(chunk))
del chunk
return cryptdata
d.addCallback(_got)
return d
def get_plaintext_segment_hashtree_nodes(self, num_segments):
if len(self._plaintext_segment_hashes) < num_segments:
# close out the last one
assert len(self._plaintext_segment_hashes) == num_segments-1
p, segment_left = self._get_segment_hasher()
self._plaintext_segment_hashes.append(p.digest())
del self._plaintext_segment_hasher
assert len(self._plaintext_segment_hashes) == num_segments
ht = hashtree.HashTree(self._plaintext_segment_hashes)
return defer.succeed(list(ht))
def get_plaintext_hash(self):
h = self._plaintext_hasher.digest()
return defer.succeed(h)
class CHKUploader:
peer_selector_class = Tahoe2PeerSelector
def __init__(self, client, options={}, wait_for_numpeers=None):
assert wait_for_numpeers is None or isinstance(wait_for_numpeers, int), wait_for_numpeers
self._client = client
self._wait_for_numpeers = wait_for_numpeers
self._options = options
self._log_number = self._client.log("CHKUploader starting")
def set_params(self, encoding_parameters):
self._encoding_parameters = encoding_parameters
def log(self, msg, parent=None):
if parent is None:
parent = self._log_number
return self._client.log(msg, parent=parent)
def start(self, uploadable):
"""Start uploading the file.
This method returns a Deferred that will fire with the URI (a
string)."""
uploadable = IUploadable(uploadable)
self.log("starting upload of %s" % uploadable)
eu = EncryptAnUploadable(uploadable)
d = self.start_encrypted(eu)
def _uploaded(res):
d1 = uploadable.get_encryption_key()
d1.addCallback(lambda key: self._compute_uri(res, key))
return d1
d.addCallback(_uploaded)
return d
def start_encrypted(self, encrypted):
eu = IEncryptedUploadable(encrypted)
e = encode.Encoder(self._options, self)
e.set_params(self._encoding_parameters)
d = e.set_encrypted_uploadable(eu)
def _wait_for_peers(res):
wait_for_numpeers = self._wait_for_numpeers
if wait_for_numpeers is None:
# wait_for_numpeers = e.get_param("share_counts")[0] # XXX
wait_for_numpeers = 1
d1 = self._client.introducer_client.when_enough_peers(wait_for_numpeers)
d1.addCallback(lambda dummy: res)
return d1
d.addCallback(_wait_for_peers)
d.addCallback(self.locate_all_shareholders)
d.addCallback(self.set_shareholders, e)
d.addCallback(lambda res: e.start())
# this fires with the uri_extension_hash and other data
return d
def locate_all_shareholders(self, encoder):
storage_index = encoder.get_param("storage_index")
upload_id = idlib.b2a(storage_index)[:6]
self.log("using storage index %s" % upload_id)
peer_selector = self.peer_selector_class(upload_id)
share_size = encoder.get_param("share_size")
block_size = encoder.get_param("block_size")
num_segments = encoder.get_param("num_segments")
k,desired,n = encoder.get_param("share_counts")
push_to_ourselves = self._options.get("push_to_ourselves", False)
gs = peer_selector.get_shareholders
d = gs(self._client, storage_index, share_size, block_size,
num_segments, n, desired, push_to_ourselves)
return d
def set_shareholders(self, used_peers, encoder):
2007-03-30 21:54:33 +00:00
"""
@param used_peers: a sequence of PeerTracker objects
"""
self.log("_send_shares, used_peers is %s" % (used_peers,))
2007-03-30 21:54:33 +00:00
for peer in used_peers:
assert isinstance(peer, PeerTracker)
2007-03-30 03:19:52 +00:00
buckets = {}
for peer in used_peers:
buckets.update(peer.buckets)
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
encoder.set_shareholders(buckets)
def _compute_uri(self, (uri_extension_hash,
needed_shares, total_shares, size),
key):
u = uri.CHKFileURI(key=key,
uri_extension_hash=uri_extension_hash,
needed_shares=needed_shares,
total_shares=total_shares,
size=size,
)
return u.to_string()
2006-12-01 09:54:28 +00:00
def read_this_many_bytes(uploadable, size, prepend_data=[]):
if size == 0:
return defer.succeed([])
d = uploadable.read(size)
def _got(data):
assert isinstance(data, list)
bytes = sum([len(piece) for piece in data])
assert bytes > 0
assert bytes <= size
remaining = size - bytes
if remaining:
return read_this_many_bytes(uploadable, remaining,
prepend_data + data)
return prepend_data + data
d.addCallback(_got)
return d
class LiteralUploader:
def __init__(self, client, wait_for_numpeers, options={}):
self._client = client
self._options = options
def set_params(self, encoding_parameters):
pass
def start(self, uploadable):
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
d.addCallback(lambda size: read_this_many_bytes(uploadable, size))
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
d.addCallback(lambda u: u.to_string())
return d
def close(self):
pass
class ConvergentUploadMixin:
# to use this, the class it is mixed in to must have a seekable
# filehandle named self._filehandle
_params = None
_key = None
def set_serialized_encoding_parameters(self, params):
self._params = params
# ignored for now
def get_encryption_key(self):
if self._key is None:
f = self._filehandle
enckey_hasher = key_hasher()
#enckey_hasher.update(encoding_parameters) # TODO
f.seek(0)
BLOCKSIZE = 64*1024
while True:
data = f.read(BLOCKSIZE)
if not data:
break
enckey_hasher.update(data)
f.seek(0)
self._key = enckey_hasher.digest()[:16]
return defer.succeed(self._key)
class NonConvergentUploadMixin:
_key = None
def set_serialized_encoding_parameters(self, params):
pass
def get_encryption_key(self):
if self._key is None:
self._key = os.urandom(16)
return defer.succeed(self._key)
class FileHandle(ConvergentUploadMixin):
implements(IUploadable)
def __init__(self, filehandle):
self._filehandle = filehandle
def get_size(self):
self._filehandle.seek(0,2)
size = self._filehandle.tell()
self._filehandle.seek(0)
return defer.succeed(size)
def read(self, length):
return defer.succeed([self._filehandle.read(length)])
def close(self):
# the originator of the filehandle reserves the right to close it
pass
class FileName(FileHandle):
def __init__(self, filename):
FileHandle.__init__(self, open(filename, "rb"))
def close(self):
FileHandle.close(self)
self._filehandle.close()
class Data(FileHandle):
def __init__(self, data):
FileHandle.__init__(self, StringIO(data))
class Uploader(service.MultiService):
"""I am a service that allows file uploading.
"""
implements(IUploader)
name = "uploader"
uploader_class = CHKUploader
URI_LIT_SIZE_THRESHOLD = 55
DEFAULT_ENCODING_PARAMETERS = (25, 75, 100)
# this is a tuple of (needed, desired, total). 'needed' is the number of
# shares required to reconstruct a file. 'desired' means that we will
# abort an upload unless we can allocate space for at least this many.
# 'total' is the total number of shares created by encoding. If everybody
# has room then this is is how many we will upload.
2007-03-30 03:19:52 +00:00
def upload(self, uploadable, options={}, wait_for_numpeers=None):
assert wait_for_numpeers is None or isinstance(wait_for_numpeers, int), wait_for_numpeers
# this returns the URI
assert self.parent
assert self.running
push_to_ourselves = self.parent.get_push_to_ourselves()
if push_to_ourselves is not None:
options["push_to_ourselves"] = push_to_ourselves
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
def _got_size(size):
uploader_class = self.uploader_class
if size <= self.URI_LIT_SIZE_THRESHOLD:
uploader_class = LiteralUploader
uploader = uploader_class(self.parent, options, wait_for_numpeers)
uploader.set_params(self.parent.get_encoding_parameters()
or self.DEFAULT_ENCODING_PARAMETERS)
return uploader.start(uploadable)
d.addCallback(_got_size)
def _done(res):
uploadable.close()
return res
d.addBoth(_done)
return d
# utility functions
def upload_data(self, data, options={}):
return self.upload(Data(data), options)
def upload_filename(self, filename, options={}):
return self.upload(FileName(filename), options)
def upload_filehandle(self, filehandle, options={}):
return self.upload(FileHandle(filehandle), options)