2007-07-20 01:21:44 +00:00
|
|
|
|
|
|
|
import os
|
2007-01-21 22:01:34 +00:00
|
|
|
from zope.interface import implements
|
2007-03-30 03:19:52 +00:00
|
|
|
from twisted.python import log
|
2006-12-01 09:54:28 +00:00
|
|
|
from twisted.internet import defer
|
2006-12-03 01:27:18 +00:00
|
|
|
from twisted.application import service
|
2006-12-04 02:07:41 +00:00
|
|
|
from foolscap import Referenceable
|
2006-12-03 01:27:18 +00:00
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
from allmydata.util import hashutil
|
2007-07-21 22:40:36 +00:00
|
|
|
from allmydata import encode, storage, hashtree, uri
|
2007-07-24 02:31:53 +00:00
|
|
|
from allmydata.interfaces import IUploadable, IUploader, IEncryptedUploadable
|
|
|
|
from allmydata.Crypto.Cipher import AES
|
2006-12-01 09:54:28 +00:00
|
|
|
|
2006-12-03 03:31:43 +00:00
|
|
|
from cStringIO import StringIO
|
2007-06-08 04:47:21 +00:00
|
|
|
import collections, random
|
2006-12-03 03:31:43 +00:00
|
|
|
|
2007-07-12 20:22:36 +00:00
|
|
|
|
2006-12-01 09:54:28 +00:00
|
|
|
class HaveAllPeersError(Exception):
|
|
|
|
# we use this to jump out of the loop
|
|
|
|
pass
|
|
|
|
|
|
|
|
# this wants to live in storage, not here
|
|
|
|
class TooFullError(Exception):
|
|
|
|
pass
|
|
|
|
|
2007-07-13 22:09:01 +00:00
|
|
|
# our current uri_extension is 846 bytes for small files, a few bytes
|
|
|
|
# more for larger ones (since the filesize is encoded in decimal in a
|
|
|
|
# few places). Ask for a little bit more just in case we need it. If
|
|
|
|
# the extension changes size, we can change EXTENSION_SIZE to
|
|
|
|
# allocate a more accurate amount of space.
|
|
|
|
EXTENSION_SIZE = 1000
|
|
|
|
|
2007-03-30 03:19:52 +00:00
|
|
|
class PeerTracker:
|
2007-06-10 03:46:04 +00:00
|
|
|
def __init__(self, peerid, permutedid, connection,
|
2007-07-13 21:04:49 +00:00
|
|
|
sharesize, blocksize, num_segments, num_share_hashes,
|
|
|
|
crypttext_hash):
|
2007-03-30 03:19:52 +00:00
|
|
|
self.peerid = peerid
|
2007-03-30 21:54:33 +00:00
|
|
|
self.permutedid = permutedid
|
2007-03-30 23:50:50 +00:00
|
|
|
self.connection = connection # to an RIClient
|
2007-03-30 03:19:52 +00:00
|
|
|
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
|
|
|
|
self.sharesize = sharesize
|
2007-07-13 22:09:01 +00:00
|
|
|
#print "PeerTracker", peerid, permutedid, sharesize
|
2007-07-14 00:25:45 +00:00
|
|
|
as = storage.allocated_size(sharesize,
|
|
|
|
num_segments,
|
|
|
|
num_share_hashes,
|
|
|
|
EXTENSION_SIZE)
|
2007-07-13 22:09:01 +00:00
|
|
|
self.allocated_size = as
|
|
|
|
|
2007-03-30 03:19:52 +00:00
|
|
|
self.blocksize = blocksize
|
2007-07-13 21:04:49 +00:00
|
|
|
self.num_segments = num_segments
|
|
|
|
self.num_share_hashes = num_share_hashes
|
2007-06-10 03:46:04 +00:00
|
|
|
self.crypttext_hash = crypttext_hash
|
2007-03-30 23:50:50 +00:00
|
|
|
self._storageserver = None
|
2007-03-30 03:19:52 +00:00
|
|
|
|
|
|
|
def query(self, sharenums):
|
2007-03-30 23:50:50 +00:00
|
|
|
if not self._storageserver:
|
|
|
|
d = self.connection.callRemote("get_service", "storageserver")
|
|
|
|
d.addCallback(self._got_storageserver)
|
|
|
|
d.addCallback(lambda res: self._query(sharenums))
|
|
|
|
return d
|
|
|
|
return self._query(sharenums)
|
|
|
|
def _got_storageserver(self, storageserver):
|
|
|
|
self._storageserver = storageserver
|
|
|
|
def _query(self, sharenums):
|
2007-07-13 22:09:01 +00:00
|
|
|
#print " query", self.peerid, len(sharenums)
|
2007-06-10 03:46:04 +00:00
|
|
|
d = self._storageserver.callRemote("allocate_buckets",
|
|
|
|
self.crypttext_hash,
|
2007-07-13 22:09:01 +00:00
|
|
|
sharenums,
|
|
|
|
self.allocated_size,
|
2007-06-10 03:46:04 +00:00
|
|
|
canary=Referenceable())
|
2007-03-30 03:19:52 +00:00
|
|
|
d.addCallback(self._got_reply)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def _got_reply(self, (alreadygot, buckets)):
|
2007-04-18 03:25:52 +00:00
|
|
|
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
|
2007-07-13 21:04:49 +00:00
|
|
|
b = {}
|
|
|
|
for sharenum, rref in buckets.iteritems():
|
2007-07-14 00:25:45 +00:00
|
|
|
bp = storage.WriteBucketProxy(rref, self.sharesize,
|
|
|
|
self.blocksize,
|
|
|
|
self.num_segments,
|
|
|
|
self.num_share_hashes,
|
|
|
|
EXTENSION_SIZE)
|
2007-07-13 21:04:49 +00:00
|
|
|
b[sharenum] = bp
|
2007-07-09 06:27:46 +00:00
|
|
|
self.buckets.update(b)
|
|
|
|
return (alreadygot, set(b.keys()))
|
2007-01-16 04:22:22 +00:00
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
class Tahoe3PeerSelector:
|
2007-01-16 04:22:22 +00:00
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
def get_shareholders(self, client,
|
|
|
|
storage_index, share_size, block_size,
|
2007-08-10 01:30:24 +00:00
|
|
|
num_segments, total_shares, shares_of_happiness,
|
|
|
|
push_to_ourselves):
|
2007-06-12 02:21:51 +00:00
|
|
|
"""
|
|
|
|
@return: a set of PeerTracker instances that have agreed to hold some
|
|
|
|
shares for us
|
|
|
|
"""
|
2007-07-20 01:21:44 +00:00
|
|
|
|
|
|
|
self.total_shares = total_shares
|
|
|
|
self.shares_of_happiness = shares_of_happiness
|
|
|
|
|
2007-03-30 18:53:03 +00:00
|
|
|
# we are responsible for locating the shareholders. self._encoder is
|
|
|
|
# responsible for handling the data and sending out the shares.
|
2007-08-10 01:30:24 +00:00
|
|
|
peers = client.get_permuted_peers(storage_index, push_to_ourselves)
|
|
|
|
|
|
|
|
assert peers, "peer selection left us with zero peers for our data"
|
2007-07-13 22:09:01 +00:00
|
|
|
|
|
|
|
# this needed_hashes computation should mirror
|
|
|
|
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
|
|
|
|
# (instead of a HashTree) because we don't require actual hashing
|
|
|
|
# just to count the levels.
|
2007-07-20 01:21:44 +00:00
|
|
|
ht = hashtree.IncompleteHashTree(total_shares)
|
2007-07-13 22:09:01 +00:00
|
|
|
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
|
|
|
|
|
2007-06-10 03:46:04 +00:00
|
|
|
trackers = [ PeerTracker(peerid, permutedid, conn,
|
|
|
|
share_size, block_size,
|
2007-07-13 21:04:49 +00:00
|
|
|
num_segments, num_share_hashes,
|
2007-07-20 01:21:44 +00:00
|
|
|
storage_index)
|
2007-03-30 03:19:52 +00:00
|
|
|
for permutedid, peerid, conn in peers ]
|
2007-03-30 21:54:33 +00:00
|
|
|
self.usable_peers = set(trackers) # this set shrinks over time
|
|
|
|
self.used_peers = set() # while this set grows
|
2007-07-20 01:21:44 +00:00
|
|
|
self.unallocated_sharenums = set(range(total_shares)) # this one shrinks
|
2007-03-30 03:19:52 +00:00
|
|
|
|
2007-03-30 21:54:33 +00:00
|
|
|
return self._locate_more_shareholders()
|
|
|
|
|
|
|
|
def _locate_more_shareholders(self):
|
2007-03-30 03:19:52 +00:00
|
|
|
d = self._query_peers()
|
2007-03-30 21:54:33 +00:00
|
|
|
d.addCallback(self._located_some_shareholders)
|
2007-01-16 04:22:22 +00:00
|
|
|
return d
|
|
|
|
|
2007-03-30 21:54:33 +00:00
|
|
|
def _located_some_shareholders(self, res):
|
|
|
|
log.msg("_located_some_shareholders")
|
2007-07-20 01:21:44 +00:00
|
|
|
log.msg(" still need homes for %d shares, still have %d usable peers"
|
|
|
|
% (len(self.unallocated_sharenums), len(self.usable_peers)))
|
2007-03-30 21:54:33 +00:00
|
|
|
if not self.unallocated_sharenums:
|
|
|
|
# Finished allocating places for all shares.
|
2007-07-20 01:21:44 +00:00
|
|
|
log.msg("%s._locate_all_shareholders() "
|
|
|
|
"Finished allocating places for all shares." % self)
|
2007-03-30 21:54:33 +00:00
|
|
|
log.msg("used_peers is %s" % (self.used_peers,))
|
|
|
|
return self.used_peers
|
|
|
|
if not self.usable_peers:
|
|
|
|
# Ran out of peers who have space.
|
2007-07-20 01:21:44 +00:00
|
|
|
log.msg("%s._locate_all_shareholders() "
|
|
|
|
"Ran out of peers who have space." % self)
|
|
|
|
margin = self.total_shares - self.shares_of_happiness
|
|
|
|
if len(self.unallocated_sharenums) < margin:
|
2007-03-30 21:54:33 +00:00
|
|
|
# But we allocated places for enough shares.
|
2007-07-20 01:21:44 +00:00
|
|
|
log.msg("%s._locate_all_shareholders() "
|
|
|
|
"But we allocated places for enough shares.")
|
2007-03-30 21:54:33 +00:00
|
|
|
return self.used_peers
|
2007-06-08 05:20:55 +00:00
|
|
|
raise encode.NotEnoughPeersError
|
2007-03-30 21:54:33 +00:00
|
|
|
# we need to keep trying
|
|
|
|
return self._locate_more_shareholders()
|
|
|
|
|
|
|
|
def _create_ring_of_things(self):
|
|
|
|
PEER = 1 # must sort later than SHARE, for consistency with download
|
|
|
|
SHARE = 0
|
2007-07-20 01:21:44 +00:00
|
|
|
# ring_of_things is a list of (position_in_ring, whatami, x) where
|
|
|
|
# whatami is SHARE if x is a sharenum or else PEER if x is a
|
|
|
|
# PeerTracker instance
|
|
|
|
ring_of_things = []
|
2007-03-30 21:54:33 +00:00
|
|
|
ring_of_things.extend([ (peer.permutedid, PEER, peer,)
|
|
|
|
for peer in self.usable_peers ])
|
|
|
|
shares = [ (i * 2**160 / self.total_shares, SHARE, i)
|
|
|
|
for i in self.unallocated_sharenums]
|
|
|
|
ring_of_things.extend(shares)
|
|
|
|
ring_of_things.sort()
|
|
|
|
ring_of_things = collections.deque(ring_of_things)
|
|
|
|
return ring_of_things
|
|
|
|
|
2007-03-30 03:19:52 +00:00
|
|
|
def _query_peers(self):
|
|
|
|
"""
|
|
|
|
@return: a deferred that fires when all queries have resolved
|
|
|
|
"""
|
2007-03-30 21:54:33 +00:00
|
|
|
PEER = 1
|
|
|
|
SHARE = 0
|
|
|
|
ring = self._create_ring_of_things()
|
|
|
|
|
2007-03-30 03:19:52 +00:00
|
|
|
# Choose a random starting point, talk to that peer.
|
2007-03-30 21:54:33 +00:00
|
|
|
ring.rotate(random.randrange(0, len(ring)))
|
2007-03-30 03:19:52 +00:00
|
|
|
|
|
|
|
# Walk backwards to find a peer. We know that we'll eventually find
|
|
|
|
# one because we earlier asserted that there was at least one.
|
2007-03-30 21:54:33 +00:00
|
|
|
while ring[0][1] != PEER:
|
|
|
|
ring.rotate(-1)
|
|
|
|
peer = ring[0][2]
|
2007-03-30 03:19:52 +00:00
|
|
|
assert isinstance(peer, PeerTracker), peer
|
2007-03-30 21:54:33 +00:00
|
|
|
ring.rotate(-1)
|
2007-03-30 03:19:52 +00:00
|
|
|
|
|
|
|
# loop invariant: at the top of the loop, we are always one step to
|
|
|
|
# the left of a peer, which is stored in the peer variable.
|
|
|
|
outstanding_queries = []
|
2007-03-30 21:54:33 +00:00
|
|
|
sharenums_to_query = set()
|
|
|
|
for i in range(len(ring)):
|
|
|
|
if ring[0][1] == SHARE:
|
|
|
|
sharenums_to_query.add(ring[0][2])
|
|
|
|
else:
|
2007-07-13 22:09:01 +00:00
|
|
|
if True or sharenums_to_query:
|
|
|
|
d = peer.query(sharenums_to_query)
|
|
|
|
d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
|
|
|
|
outstanding_queries.append(d)
|
|
|
|
d.addErrback(log.err)
|
2007-03-30 21:54:33 +00:00
|
|
|
peer = ring[0][2]
|
|
|
|
sharenums_to_query = set()
|
|
|
|
ring.rotate(-1)
|
|
|
|
|
2007-03-30 03:19:52 +00:00
|
|
|
return defer.DeferredList(outstanding_queries)
|
|
|
|
|
|
|
|
def _got_response(self, (alreadygot, allocated), peer, shares_we_requested):
|
|
|
|
"""
|
|
|
|
@type alreadygot: a set of sharenums
|
|
|
|
@type allocated: a set of sharenums
|
|
|
|
"""
|
2007-03-30 23:55:04 +00:00
|
|
|
# TODO: some future version of Foolscap might not convert inbound
|
|
|
|
# sets into sets.Set on us, even when we're using 2.4
|
|
|
|
alreadygot = set(alreadygot)
|
|
|
|
allocated = set(allocated)
|
2007-07-20 01:21:44 +00:00
|
|
|
#log.msg("%s._got_response(%s, %s, %s): "
|
|
|
|
# "self.unallocated_sharenums: %s, unhandled: %s"
|
|
|
|
# % (self, (alreadygot, allocated), peer, shares_we_requested,
|
|
|
|
# self.unallocated_sharenums,
|
|
|
|
# shares_we_requested - alreadygot - allocated))
|
2007-03-30 03:19:52 +00:00
|
|
|
self.unallocated_sharenums -= alreadygot
|
|
|
|
self.unallocated_sharenums -= allocated
|
|
|
|
|
|
|
|
if allocated:
|
2007-03-30 23:50:50 +00:00
|
|
|
self.used_peers.add(peer)
|
2007-03-30 03:19:52 +00:00
|
|
|
|
|
|
|
if shares_we_requested - alreadygot - allocated:
|
|
|
|
# Then he didn't accept some of the shares, so he's full.
|
2007-07-20 01:21:44 +00:00
|
|
|
|
|
|
|
#log.msg("%s._got_response(%s, %s, %s): "
|
|
|
|
# "self.unallocated_sharenums: %s, unhandled: %s HE'S FULL"
|
|
|
|
# % (self,
|
|
|
|
# (alreadygot, allocated), peer, shares_we_requested,
|
|
|
|
# self.unallocated_sharenums,
|
|
|
|
# shares_we_requested - alreadygot - allocated))
|
2007-03-30 03:19:52 +00:00
|
|
|
self.usable_peers.remove(peer)
|
|
|
|
|
|
|
|
def _got_error(self, f, peer):
|
2007-03-30 21:54:33 +00:00
|
|
|
log.msg("%s._got_error(%s, %s)" % (self, f, peer,))
|
|
|
|
self.usable_peers.remove(peer)
|
2007-03-30 03:19:52 +00:00
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
class EncryptAnUploadable:
|
|
|
|
"""This is a wrapper that takes an IUploadable and provides
|
|
|
|
IEncryptedUploadable."""
|
|
|
|
implements(IEncryptedUploadable)
|
|
|
|
|
|
|
|
def __init__(self, original):
|
|
|
|
self.original = original
|
|
|
|
self._encryptor = None
|
|
|
|
self._plaintext_hasher = hashutil.plaintext_hasher()
|
|
|
|
self._plaintext_segment_hasher = None
|
|
|
|
self._plaintext_segment_hashes = []
|
|
|
|
self._params = None
|
|
|
|
|
|
|
|
def get_size(self):
|
|
|
|
return self.original.get_size()
|
|
|
|
|
|
|
|
def set_serialized_encoding_parameters(self, params):
|
|
|
|
self._params = params
|
|
|
|
|
|
|
|
def _get_encryptor(self):
|
|
|
|
if self._encryptor:
|
|
|
|
return defer.succeed(self._encryptor)
|
|
|
|
|
|
|
|
if self._params is not None:
|
|
|
|
self.original.set_serialized_encoding_parameters(self._params)
|
|
|
|
|
|
|
|
d = self.original.get_encryption_key()
|
|
|
|
def _got(key):
|
|
|
|
e = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
|
|
|
|
self._encryptor = e
|
|
|
|
|
|
|
|
storage_index = hashutil.storage_index_chk_hash(key)
|
|
|
|
assert isinstance(storage_index, str)
|
|
|
|
# There's no point to having the SI be longer than the key, so we
|
|
|
|
# specify that it is truncated to the same 128 bits as the AES key.
|
|
|
|
assert len(storage_index) == 16 # SHA-256 truncated to 128b
|
|
|
|
self._storage_index = storage_index
|
|
|
|
|
|
|
|
return e
|
|
|
|
d.addCallback(_got)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def get_storage_index(self):
|
|
|
|
d = self._get_encryptor()
|
|
|
|
d.addCallback(lambda res: self._storage_index)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def set_segment_size(self, segsize):
|
|
|
|
self._segment_size = segsize
|
|
|
|
|
|
|
|
def _get_segment_hasher(self):
|
|
|
|
p = self._plaintext_segment_hasher
|
|
|
|
if p:
|
|
|
|
left = self._segment_size - self._plaintext_segment_hashed_bytes
|
|
|
|
return p, left
|
|
|
|
p = hashutil.plaintext_segment_hasher()
|
|
|
|
self._plaintext_segment_hasher = p
|
|
|
|
self._plaintext_segment_hashed_bytes = 0
|
|
|
|
return p, self._segment_size
|
|
|
|
|
|
|
|
def _update_segment_hash(self, chunk):
|
|
|
|
offset = 0
|
|
|
|
while offset < len(chunk):
|
|
|
|
p, segment_left = self._get_segment_hasher()
|
|
|
|
chunk_left = len(chunk) - offset
|
|
|
|
this_segment = min(chunk_left, segment_left)
|
|
|
|
p.update(chunk[offset:offset+this_segment])
|
|
|
|
self._plaintext_segment_hashed_bytes += this_segment
|
|
|
|
|
|
|
|
if self._plaintext_segment_hashed_bytes == self._segment_size:
|
|
|
|
# we've filled this segment
|
|
|
|
self._plaintext_segment_hashes.append(p.digest())
|
|
|
|
self._plaintext_segment_hasher = None
|
|
|
|
|
|
|
|
offset += this_segment
|
|
|
|
|
|
|
|
def read_encrypted(self, length):
|
|
|
|
d = self._get_encryptor()
|
|
|
|
d.addCallback(lambda res: self.original.read(length))
|
|
|
|
def _got(data):
|
|
|
|
assert isinstance(data, (tuple, list)), type(data)
|
|
|
|
data = list(data)
|
|
|
|
cryptdata = []
|
|
|
|
# we use data.pop(0) instead of 'for chunk in data' to save
|
|
|
|
# memory: each chunk is destroyed as soon as we're done with it.
|
|
|
|
while data:
|
|
|
|
chunk = data.pop(0)
|
|
|
|
self._plaintext_hasher.update(chunk)
|
|
|
|
self._update_segment_hash(chunk)
|
|
|
|
cryptdata.append(self._encryptor.encrypt(chunk))
|
|
|
|
del chunk
|
|
|
|
return cryptdata
|
|
|
|
d.addCallback(_got)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def get_plaintext_segment_hashtree_nodes(self, num_segments):
|
|
|
|
if len(self._plaintext_segment_hashes) < num_segments:
|
|
|
|
# close out the last one
|
|
|
|
assert len(self._plaintext_segment_hashes) == num_segments-1
|
|
|
|
p, segment_left = self._get_segment_hasher()
|
|
|
|
self._plaintext_segment_hashes.append(p.digest())
|
|
|
|
del self._plaintext_segment_hasher
|
|
|
|
assert len(self._plaintext_segment_hashes) == num_segments
|
|
|
|
ht = hashtree.HashTree(self._plaintext_segment_hashes)
|
|
|
|
return defer.succeed(list(ht))
|
|
|
|
|
|
|
|
def get_plaintext_hash(self):
|
|
|
|
h = self._plaintext_hasher.digest()
|
|
|
|
return defer.succeed(h)
|
|
|
|
|
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
class CHKUploader:
|
|
|
|
peer_selector_class = Tahoe3PeerSelector
|
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
def __init__(self, client, options={}):
|
2007-07-20 01:21:44 +00:00
|
|
|
self._client = client
|
|
|
|
self._options = options
|
|
|
|
|
|
|
|
def set_params(self, encoding_parameters):
|
|
|
|
self._encoding_parameters = encoding_parameters
|
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
def start(self, uploadable):
|
2007-07-20 01:21:44 +00:00
|
|
|
"""Start uploading the file.
|
|
|
|
|
|
|
|
This method returns a Deferred that will fire with the URI (a
|
|
|
|
string)."""
|
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
uploadable = IUploadable(uploadable)
|
|
|
|
log.msg("starting upload of %s" % uploadable)
|
|
|
|
|
|
|
|
eu = EncryptAnUploadable(uploadable)
|
|
|
|
d = self.start_encrypted(eu)
|
|
|
|
def _uploaded(res):
|
|
|
|
d1 = uploadable.get_encryption_key()
|
|
|
|
d1.addCallback(lambda key: self._compute_uri(res, key))
|
|
|
|
return d1
|
|
|
|
d.addCallback(_uploaded)
|
|
|
|
return d
|
2007-07-20 01:21:44 +00:00
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
def start_encrypted(self, encrypted):
|
|
|
|
eu = IEncryptedUploadable(encrypted)
|
|
|
|
|
|
|
|
e = encode.Encoder(self._options)
|
|
|
|
e.set_params(self._encoding_parameters)
|
|
|
|
d = e.set_encrypted_uploadable(eu)
|
2007-07-20 01:21:44 +00:00
|
|
|
d.addCallback(self.locate_all_shareholders)
|
2007-07-24 02:31:53 +00:00
|
|
|
d.addCallback(self.set_shareholders, e)
|
|
|
|
d.addCallback(lambda res: e.start())
|
|
|
|
# this fires with the uri_extension_hash and other data
|
2007-07-20 01:21:44 +00:00
|
|
|
return d
|
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
def locate_all_shareholders(self, encoder):
|
2007-07-20 01:21:44 +00:00
|
|
|
peer_selector = self.peer_selector_class()
|
2007-07-24 02:31:53 +00:00
|
|
|
|
|
|
|
storage_index = encoder.get_param("storage_index")
|
|
|
|
share_size = encoder.get_param("share_size")
|
|
|
|
block_size = encoder.get_param("block_size")
|
|
|
|
num_segments = encoder.get_param("num_segments")
|
|
|
|
k,desired,n = encoder.get_param("share_counts")
|
2007-08-10 01:30:24 +00:00
|
|
|
push_to_ourselves = self._options.get("push_to_ourselves", False)
|
2007-07-24 02:31:53 +00:00
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
gs = peer_selector.get_shareholders
|
2007-07-24 02:31:53 +00:00
|
|
|
d = gs(self._client, storage_index, share_size, block_size,
|
2007-08-10 01:30:24 +00:00
|
|
|
num_segments, n, desired, push_to_ourselves)
|
2007-07-20 01:21:44 +00:00
|
|
|
return d
|
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
def set_shareholders(self, used_peers, encoder):
|
2007-03-30 21:54:33 +00:00
|
|
|
"""
|
|
|
|
@param used_peers: a sequence of PeerTracker objects
|
|
|
|
"""
|
|
|
|
log.msg("_send_shares, used_peers is %s" % (used_peers,))
|
|
|
|
for peer in used_peers:
|
|
|
|
assert isinstance(peer, PeerTracker)
|
2007-03-30 03:19:52 +00:00
|
|
|
buckets = {}
|
|
|
|
for peer in used_peers:
|
|
|
|
buckets.update(peer.buckets)
|
|
|
|
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
|
2007-07-24 02:31:53 +00:00
|
|
|
encoder.set_shareholders(buckets)
|
2007-06-02 01:48:01 +00:00
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
def _compute_uri(self, (uri_extension_hash,
|
|
|
|
needed_shares, total_shares, size),
|
|
|
|
key):
|
|
|
|
u = uri.CHKFileURI(key=key,
|
2007-07-21 22:40:36 +00:00
|
|
|
uri_extension_hash=uri_extension_hash,
|
2007-07-24 02:31:53 +00:00
|
|
|
needed_shares=needed_shares,
|
|
|
|
total_shares=total_shares,
|
|
|
|
size=size,
|
2007-07-21 22:40:36 +00:00
|
|
|
)
|
|
|
|
return u.to_string()
|
2006-12-01 09:54:28 +00:00
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
def read_this_many_bytes(uploadable, size, prepend_data=[]):
|
2007-07-20 05:53:29 +00:00
|
|
|
if size == 0:
|
|
|
|
return defer.succeed([])
|
2007-07-20 01:21:44 +00:00
|
|
|
d = uploadable.read(size)
|
|
|
|
def _got(data):
|
2007-07-20 05:53:29 +00:00
|
|
|
assert isinstance(data, list)
|
2007-07-20 01:21:44 +00:00
|
|
|
bytes = sum([len(piece) for piece in data])
|
|
|
|
assert bytes > 0
|
|
|
|
assert bytes <= size
|
|
|
|
remaining = size - bytes
|
|
|
|
if remaining:
|
|
|
|
return read_this_many_bytes(uploadable, remaining,
|
|
|
|
prepend_data + data)
|
|
|
|
return prepend_data + data
|
|
|
|
d.addCallback(_got)
|
|
|
|
return d
|
|
|
|
|
2007-07-12 20:22:36 +00:00
|
|
|
class LiteralUploader:
|
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
def __init__(self, client, options={}):
|
2007-07-12 20:22:36 +00:00
|
|
|
self._client = client
|
|
|
|
self._options = options
|
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
def set_params(self, encoding_parameters):
|
|
|
|
pass
|
2007-07-12 20:22:36 +00:00
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
def start(self, uploadable):
|
|
|
|
uploadable = IUploadable(uploadable)
|
|
|
|
d = uploadable.get_size()
|
|
|
|
d.addCallback(lambda size: read_this_many_bytes(uploadable, size))
|
2007-07-21 22:40:36 +00:00
|
|
|
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
|
|
|
|
d.addCallback(lambda u: u.to_string())
|
2007-07-20 01:21:44 +00:00
|
|
|
return d
|
2007-07-12 20:22:36 +00:00
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
def close(self):
|
|
|
|
pass
|
2007-01-16 04:22:22 +00:00
|
|
|
|
2006-12-04 02:07:41 +00:00
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
class ConvergentUploadMixin:
|
|
|
|
# to use this, the class it is mixed in to must have a seekable
|
|
|
|
# filehandle named self._filehandle
|
2007-07-24 02:31:53 +00:00
|
|
|
_params = None
|
|
|
|
_key = None
|
|
|
|
|
|
|
|
def set_serialized_encoding_parameters(self, params):
|
|
|
|
self._params = params
|
|
|
|
# ignored for now
|
|
|
|
|
|
|
|
def get_encryption_key(self):
|
|
|
|
if self._key is None:
|
|
|
|
f = self._filehandle
|
|
|
|
enckey_hasher = hashutil.key_hasher()
|
|
|
|
#enckey_hasher.update(encoding_parameters) # TODO
|
|
|
|
f.seek(0)
|
|
|
|
BLOCKSIZE = 64*1024
|
|
|
|
while True:
|
|
|
|
data = f.read(BLOCKSIZE)
|
|
|
|
if not data:
|
|
|
|
break
|
|
|
|
enckey_hasher.update(data)
|
|
|
|
f.seek(0)
|
|
|
|
self._key = enckey_hasher.digest()[:16]
|
|
|
|
|
|
|
|
return defer.succeed(self._key)
|
2007-07-20 01:21:44 +00:00
|
|
|
|
|
|
|
class NonConvergentUploadMixin:
|
2007-07-24 02:31:53 +00:00
|
|
|
_key = None
|
|
|
|
|
|
|
|
def set_serialized_encoding_parameters(self, params):
|
|
|
|
pass
|
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
def get_encryption_key(self, encoding_parameters):
|
2007-07-24 02:31:53 +00:00
|
|
|
if self._key is None:
|
|
|
|
self._key = os.urandom(16)
|
|
|
|
return defer.succeed(self._key)
|
2007-07-20 01:21:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
class FileHandle(ConvergentUploadMixin):
|
2006-12-04 02:07:41 +00:00
|
|
|
implements(IUploadable)
|
2007-07-20 01:21:44 +00:00
|
|
|
|
2006-12-04 02:07:41 +00:00
|
|
|
def __init__(self, filehandle):
|
|
|
|
self._filehandle = filehandle
|
2007-07-20 01:21:44 +00:00
|
|
|
|
|
|
|
def get_size(self):
|
|
|
|
self._filehandle.seek(0,2)
|
|
|
|
size = self._filehandle.tell()
|
|
|
|
self._filehandle.seek(0)
|
|
|
|
return defer.succeed(size)
|
|
|
|
|
|
|
|
def read(self, length):
|
|
|
|
return defer.succeed([self._filehandle.read(length)])
|
|
|
|
|
|
|
|
def close(self):
|
2006-12-04 02:07:41 +00:00
|
|
|
# the originator of the filehandle reserves the right to close it
|
|
|
|
pass
|
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
class FileName(FileHandle):
|
|
|
|
def __init__(self, filename):
|
|
|
|
FileHandle.__init__(self, open(filename, "rb"))
|
|
|
|
def close(self):
|
|
|
|
FileHandle.close(self)
|
|
|
|
self._filehandle.close()
|
|
|
|
|
|
|
|
class Data(FileHandle):
|
|
|
|
def __init__(self, data):
|
|
|
|
FileHandle.__init__(self, StringIO(data))
|
|
|
|
|
2006-12-03 01:27:18 +00:00
|
|
|
class Uploader(service.MultiService):
|
|
|
|
"""I am a service that allows file uploading.
|
|
|
|
"""
|
2007-01-21 22:01:34 +00:00
|
|
|
implements(IUploader)
|
2006-12-03 01:27:18 +00:00
|
|
|
name = "uploader"
|
2007-07-20 01:21:44 +00:00
|
|
|
uploader_class = CHKUploader
|
2007-07-12 20:22:36 +00:00
|
|
|
URI_LIT_SIZE_THRESHOLD = 55
|
2006-12-03 01:27:18 +00:00
|
|
|
|
2007-07-12 22:33:30 +00:00
|
|
|
DEFAULT_ENCODING_PARAMETERS = (25, 75, 100)
|
|
|
|
# this is a tuple of (needed, desired, total). 'needed' is the number of
|
|
|
|
# shares required to reconstruct a file. 'desired' means that we will
|
|
|
|
# abort an upload unless we can allocate space for at least this many.
|
|
|
|
# 'total' is the total number of shares created by encoding. If everybody
|
|
|
|
# has room then this is is how many we will upload.
|
2007-03-30 03:19:52 +00:00
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
def upload(self, uploadable, options={}):
|
2007-01-19 09:23:03 +00:00
|
|
|
# this returns the URI
|
2006-12-03 01:27:18 +00:00
|
|
|
assert self.parent
|
|
|
|
assert self.running
|
2007-08-10 01:30:24 +00:00
|
|
|
push_to_ourselves = self.parent.get_push_to_ourselves()
|
|
|
|
if push_to_ourselves is not None:
|
|
|
|
options["push_to_ourselves"] = push_to_ourselves
|
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
uploadable = IUploadable(uploadable)
|
|
|
|
d = uploadable.get_size()
|
|
|
|
def _got_size(size):
|
|
|
|
uploader_class = self.uploader_class
|
|
|
|
if size <= self.URI_LIT_SIZE_THRESHOLD:
|
|
|
|
uploader_class = LiteralUploader
|
2007-07-24 02:31:53 +00:00
|
|
|
uploader = uploader_class(self.parent, options)
|
2007-07-20 01:21:44 +00:00
|
|
|
uploader.set_params(self.parent.get_encoding_parameters()
|
|
|
|
or self.DEFAULT_ENCODING_PARAMETERS)
|
2007-07-24 02:31:53 +00:00
|
|
|
return uploader.start(uploadable)
|
2007-07-20 01:21:44 +00:00
|
|
|
d.addCallback(_got_size)
|
2006-12-04 02:07:41 +00:00
|
|
|
def _done(res):
|
2007-07-20 01:21:44 +00:00
|
|
|
uploadable.close()
|
2006-12-04 02:07:41 +00:00
|
|
|
return res
|
|
|
|
d.addBoth(_done)
|
2006-12-03 01:27:18 +00:00
|
|
|
return d
|
|
|
|
|
2006-12-04 02:07:41 +00:00
|
|
|
# utility functions
|
2007-04-17 20:40:47 +00:00
|
|
|
def upload_data(self, data, options={}):
|
|
|
|
return self.upload(Data(data), options)
|
|
|
|
def upload_filename(self, filename, options={}):
|
|
|
|
return self.upload(FileName(filename), options)
|
|
|
|
def upload_filehandle(self, filehandle, options={}):
|
|
|
|
return self.upload(FileHandle(filehandle), options)
|