2008-03-01 05:19:03 +00:00
|
|
|
import os, time, weakref, itertools
|
2007-01-21 22:01:34 +00:00
|
|
|
from zope.interface import implements
|
2008-01-10 03:25:50 +00:00
|
|
|
from twisted.python import failure
|
2006-12-01 09:54:28 +00:00
|
|
|
from twisted.internet import defer
|
2006-12-03 01:27:18 +00:00
|
|
|
from twisted.application import service
|
2009-05-22 00:38:23 +00:00
|
|
|
from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
|
2006-12-03 01:27:18 +00:00
|
|
|
|
2007-08-28 02:00:18 +00:00
|
|
|
from allmydata.util.hashutil import file_renewal_secret_hash, \
|
|
|
|
file_cancel_secret_hash, bucket_renewal_secret_hash, \
|
|
|
|
bucket_cancel_secret_hash, plaintext_hasher, \
|
2008-03-24 16:46:06 +00:00
|
|
|
storage_index_hash, plaintext_segment_hasher, convergence_hasher
|
2009-02-18 21:46:55 +00:00
|
|
|
from allmydata import hashtree, uri
|
|
|
|
from allmydata.storage.server import si_b2a
|
2008-07-16 20:14:39 +00:00
|
|
|
from allmydata.immutable import encode
|
2009-01-12 18:00:22 +00:00
|
|
|
from allmydata.util import base32, dictutil, idlib, log, mathutil
|
2010-05-14 00:49:17 +00:00
|
|
|
from allmydata.util.happinessutil import servers_of_happiness, \
|
|
|
|
shares_by_server, merge_peers, \
|
|
|
|
failure_message
|
2007-12-20 00:55:28 +00:00
|
|
|
from allmydata.util.assertutil import precondition
|
2009-05-22 00:46:32 +00:00
|
|
|
from allmydata.util.rrefutil import add_version_to_remote_reference
|
2008-02-06 04:01:38 +00:00
|
|
|
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
|
2008-11-22 03:29:32 +00:00
|
|
|
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
|
2009-12-05 05:30:37 +00:00
|
|
|
NoServersError, InsufficientVersionError, UploadUnhappinessError
|
2008-10-10 00:08:00 +00:00
|
|
|
from allmydata.immutable import layout
|
2007-12-04 00:27:46 +00:00
|
|
|
from pycryptopp.cipher.aes import AES
|
2006-12-01 09:54:28 +00:00
|
|
|
|
2006-12-03 03:31:43 +00:00
|
|
|
from cStringIO import StringIO
|
|
|
|
|
2007-07-12 20:22:36 +00:00
|
|
|
|
2008-01-16 10:03:35 +00:00
|
|
|
KiB=1024
|
|
|
|
MiB=1024*KiB
|
|
|
|
GiB=1024*MiB
|
|
|
|
TiB=1024*GiB
|
|
|
|
PiB=1024*TiB
|
|
|
|
|
2006-12-01 09:54:28 +00:00
|
|
|
class HaveAllPeersError(Exception):
|
|
|
|
# we use this to jump out of the loop
|
|
|
|
pass
|
|
|
|
|
|
|
|
# this wants to live in storage, not here
|
|
|
|
class TooFullError(Exception):
|
|
|
|
pass
|
|
|
|
|
2008-02-06 08:52:25 +00:00
|
|
|
class UploadResults(Copyable, RemoteCopy):
|
2008-02-06 04:01:38 +00:00
|
|
|
implements(IUploadResults)
|
2008-07-16 20:14:39 +00:00
|
|
|
# note: don't change this string, it needs to match the value used on the
|
|
|
|
# helper, and it does *not* need to match the fully-qualified
|
|
|
|
# package/module/class name
|
2008-02-06 08:52:25 +00:00
|
|
|
typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
|
|
|
|
copytype = typeToCopy
|
|
|
|
|
2009-02-09 21:50:04 +00:00
|
|
|
# also, think twice about changing the shape of any existing attribute,
|
|
|
|
# because instances of this class are sent from the helper to its client,
|
|
|
|
# so changing this may break compatibility. Consider adding new fields
|
|
|
|
# instead of modifying existing ones.
|
|
|
|
|
2008-02-06 07:41:51 +00:00
|
|
|
def __init__(self):
|
|
|
|
self.timings = {} # dict of name to number of seconds
|
2009-01-12 18:00:22 +00:00
|
|
|
self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
|
|
|
|
self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
|
2008-03-06 02:51:51 +00:00
|
|
|
self.file_size = None
|
|
|
|
self.ciphertext_fetched = None # how much the helper fetched
|
|
|
|
self.uri = None
|
|
|
|
self.preexisting_shares = None # count of shares already present
|
|
|
|
self.pushed_shares = None # count of shares we pushed
|
|
|
|
|
2008-02-06 04:01:38 +00:00
|
|
|
|
2007-07-13 22:09:01 +00:00
|
|
|
# our current uri_extension is 846 bytes for small files, a few bytes
|
|
|
|
# more for larger ones (since the filesize is encoded in decimal in a
|
|
|
|
# few places). Ask for a little bit more just in case we need it. If
|
|
|
|
# the extension changes size, we can change EXTENSION_SIZE to
|
|
|
|
# allocate a more accurate amount of space.
|
|
|
|
EXTENSION_SIZE = 1000
|
2008-04-15 18:14:58 +00:00
|
|
|
# TODO: actual extensions are closer to 419 bytes, so we can probably lower
|
|
|
|
# this.
|
2007-07-13 22:09:01 +00:00
|
|
|
|
2007-03-30 03:19:52 +00:00
|
|
|
class PeerTracker:
|
2008-02-05 20:05:13 +00:00
|
|
|
def __init__(self, peerid, storage_server,
|
2007-07-13 21:04:49 +00:00
|
|
|
sharesize, blocksize, num_segments, num_share_hashes,
|
2007-08-28 02:00:18 +00:00
|
|
|
storage_index,
|
|
|
|
bucket_renewal_secret, bucket_cancel_secret):
|
2007-12-20 00:55:28 +00:00
|
|
|
precondition(isinstance(peerid, str), peerid)
|
|
|
|
precondition(len(peerid) == 20, peerid)
|
2007-03-30 03:19:52 +00:00
|
|
|
self.peerid = peerid
|
2008-02-05 20:05:13 +00:00
|
|
|
self._storageserver = storage_server # to an RIStorageServer
|
2007-03-30 03:19:52 +00:00
|
|
|
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
|
|
|
|
self.sharesize = sharesize
|
2007-11-01 22:22:47 +00:00
|
|
|
|
2009-01-13 03:14:42 +00:00
|
|
|
wbp = layout.make_write_bucket_proxy(None, sharesize,
|
|
|
|
blocksize, num_segments,
|
|
|
|
num_share_hashes,
|
|
|
|
EXTENSION_SIZE, peerid)
|
|
|
|
self.wbp_class = wbp.__class__ # to create more of them
|
|
|
|
self.allocated_size = wbp.get_allocated_size()
|
2007-03-30 03:19:52 +00:00
|
|
|
self.blocksize = blocksize
|
2007-07-13 21:04:49 +00:00
|
|
|
self.num_segments = num_segments
|
|
|
|
self.num_share_hashes = num_share_hashes
|
2007-08-28 00:28:51 +00:00
|
|
|
self.storage_index = storage_index
|
2007-03-30 03:19:52 +00:00
|
|
|
|
2007-08-28 02:00:18 +00:00
|
|
|
self.renew_secret = bucket_renewal_secret
|
|
|
|
self.cancel_secret = bucket_cancel_secret
|
2007-08-28 00:28:51 +00:00
|
|
|
|
2007-09-16 08:24:07 +00:00
|
|
|
def __repr__(self):
|
|
|
|
return ("<PeerTracker for peer %s and SI %s>"
|
2007-12-20 00:55:28 +00:00
|
|
|
% (idlib.shortnodeid_b2a(self.peerid),
|
2009-02-18 21:46:55 +00:00
|
|
|
si_b2a(self.storage_index)[:5]))
|
2007-09-16 08:24:07 +00:00
|
|
|
|
2007-03-30 03:19:52 +00:00
|
|
|
def query(self, sharenums):
|
2007-06-10 03:46:04 +00:00
|
|
|
d = self._storageserver.callRemote("allocate_buckets",
|
2007-08-28 00:28:51 +00:00
|
|
|
self.storage_index,
|
|
|
|
self.renew_secret,
|
|
|
|
self.cancel_secret,
|
2007-07-13 22:09:01 +00:00
|
|
|
sharenums,
|
|
|
|
self.allocated_size,
|
2007-06-10 03:46:04 +00:00
|
|
|
canary=Referenceable())
|
2007-03-30 03:19:52 +00:00
|
|
|
d.addCallback(self._got_reply)
|
|
|
|
return d
|
2007-11-01 22:22:47 +00:00
|
|
|
|
2010-05-14 00:49:17 +00:00
|
|
|
def ask_about_existing_shares(self):
|
|
|
|
return self._storageserver.callRemote("get_buckets",
|
|
|
|
self.storage_index)
|
2009-11-16 20:28:05 +00:00
|
|
|
|
2007-03-30 03:19:52 +00:00
|
|
|
def _got_reply(self, (alreadygot, buckets)):
|
2007-04-18 03:25:52 +00:00
|
|
|
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
|
2007-07-13 21:04:49 +00:00
|
|
|
b = {}
|
|
|
|
for sharenum, rref in buckets.iteritems():
|
2009-01-13 03:14:42 +00:00
|
|
|
bp = self.wbp_class(rref, self.sharesize,
|
|
|
|
self.blocksize,
|
|
|
|
self.num_segments,
|
|
|
|
self.num_share_hashes,
|
|
|
|
EXTENSION_SIZE,
|
|
|
|
self.peerid)
|
2007-07-13 21:04:49 +00:00
|
|
|
b[sharenum] = bp
|
2007-07-09 06:27:46 +00:00
|
|
|
self.buckets.update(b)
|
|
|
|
return (alreadygot, set(b.keys()))
|
2007-01-16 04:22:22 +00:00
|
|
|
|
2009-11-18 02:45:42 +00:00
|
|
|
|
2010-07-15 23:17:14 +00:00
|
|
|
def abort(self):
|
|
|
|
"""
|
2010-07-19 04:46:55 +00:00
|
|
|
I abort the remote bucket writers for all shares. This is a good idea
|
|
|
|
to conserve space on the storage server.
|
2010-07-15 23:17:14 +00:00
|
|
|
"""
|
2010-07-19 04:46:55 +00:00
|
|
|
self.abort_some_buckets(self.buckets.keys())
|
|
|
|
|
|
|
|
def abort_some_buckets(self, sharenums):
|
|
|
|
"""
|
|
|
|
I abort the remote bucket writers for the share numbers in sharenums.
|
|
|
|
"""
|
|
|
|
for sharenum in sharenums:
|
|
|
|
if sharenum in self.buckets:
|
|
|
|
self.buckets[sharenum].abort()
|
|
|
|
del self.buckets[sharenum]
|
2010-07-15 23:17:14 +00:00
|
|
|
|
|
|
|
|
2007-09-16 08:24:07 +00:00
|
|
|
class Tahoe2PeerSelector:
|
|
|
|
|
2008-02-12 22:36:05 +00:00
|
|
|
def __init__(self, upload_id, logparent=None, upload_status=None):
|
2007-09-16 08:24:07 +00:00
|
|
|
self.upload_id = upload_id
|
|
|
|
self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
|
2009-11-23 01:24:05 +00:00
|
|
|
# Peers that are working normally, but full.
|
|
|
|
self.full_count = 0
|
2007-09-16 08:24:07 +00:00
|
|
|
self.error_count = 0
|
|
|
|
self.num_peers_contacted = 0
|
|
|
|
self.last_failure_msg = None
|
2008-02-12 22:36:05 +00:00
|
|
|
self._status = IUploadStatus(upload_status)
|
2008-01-15 04:19:20 +00:00
|
|
|
self._log_parent = log.msg("%s starting" % self, parent=logparent)
|
2007-09-16 08:24:07 +00:00
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
|
|
|
|
|
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
* stop using IURI as an adapter
* pass cap strings around instead of URI instances
* move filenode/dirnode creation duties from Client to new NodeMaker class
* move other Client duties to KeyGenerator, SecretHolder, History classes
* stop passing Client reference to dirnode/filenode constructors
- pass less-powerful references instead, like StorageBroker or Uploader
* always create DirectoryNodes by wrapping a filenode (mutable for now)
* remove some specialized mock classes from unit tests
Detailed list of changes (done one at a time, then merged together)
always pass a string to create_node_from_uri(), not an IURI instance
always pass a string to IFilesystemNode constructors, not an IURI instance
stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri()
client.py: move SecretHolder code out to a separate class
test_web.py: hush pyflakes
client.py: move NodeMaker functionality out into a separate object
LiteralFileNode: stop storing a Client reference
immutable Checker: remove Client reference, it only needs a SecretHolder
immutable Upload: remove Client reference, leave SecretHolder and StorageBroker
immutable Repairer: replace Client reference with StorageBroker and SecretHolder
immutable FileNode: remove Client reference
mutable.Publish: stop passing Client
mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference
MutableChecker: reference StorageBroker and History directly, not through Client
mutable.FileNode: removed unused indirection to checker classes
mutable.FileNode: remove Client reference
client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker
move create_mutable_file() into NodeMaker
test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests.
test_mutable.py: clean up basedir names
client.py: move create_empty_dirnode() into NodeMaker
dirnode.py: get rid of DirectoryNode.create
remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match
stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker
remove Client from NodeMaker
move helper status into History, pass History to web.Status instead of Client
test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
|
|
|
def get_shareholders(self, storage_broker, secret_holder,
|
2007-09-16 08:24:07 +00:00
|
|
|
storage_index, share_size, block_size,
|
2009-12-30 22:03:44 +00:00
|
|
|
num_segments, total_shares, needed_shares,
|
|
|
|
servers_of_happiness):
|
2007-09-16 08:24:07 +00:00
|
|
|
"""
|
2010-07-19 04:47:44 +00:00
|
|
|
@return: (upload_servers, already_peers), where upload_servers is a set of
|
2008-03-06 02:51:51 +00:00
|
|
|
PeerTracker instances that have agreed to hold some shares
|
2010-07-19 04:47:44 +00:00
|
|
|
for us (the shareids are stashed inside the PeerTracker),
|
2010-05-14 00:49:17 +00:00
|
|
|
and already_peers is a dict mapping shnum to a set of peers
|
|
|
|
which claim to already have the share.
|
2007-09-16 08:24:07 +00:00
|
|
|
"""
|
|
|
|
|
2008-02-12 22:36:05 +00:00
|
|
|
if self._status:
|
|
|
|
self._status.set_status("Contacting Peers..")
|
|
|
|
|
2007-09-16 08:24:07 +00:00
|
|
|
self.total_shares = total_shares
|
2009-11-04 12:12:22 +00:00
|
|
|
self.servers_of_happiness = servers_of_happiness
|
2009-12-30 22:03:44 +00:00
|
|
|
self.needed_shares = needed_shares
|
2007-09-16 08:24:07 +00:00
|
|
|
|
|
|
|
self.homeless_shares = range(total_shares)
|
2007-09-17 00:08:34 +00:00
|
|
|
self.contacted_peers = [] # peers worth asking again
|
|
|
|
self.contacted_peers2 = [] # peers that we have asked again
|
2008-01-15 04:19:20 +00:00
|
|
|
self._started_second_pass = False
|
2007-09-16 08:24:07 +00:00
|
|
|
self.use_peers = set() # PeerTrackers that have shares assigned to them
|
2010-05-14 00:49:17 +00:00
|
|
|
self.preexisting_shares = {} # shareid => set(peerids) holding shareid
|
|
|
|
# We don't try to allocate shares to these servers, since they've said
|
|
|
|
# that they're incapable of storing shares of the size that we'd want
|
|
|
|
# to store. We keep them around because they may have existing shares
|
|
|
|
# for this storage index, which we want to know about for accurate
|
|
|
|
# servers_of_happiness accounting
|
|
|
|
# (this is eventually a list, but it is initialized later)
|
|
|
|
self.readonly_peers = None
|
|
|
|
# These peers have shares -- any shares -- for our SI. We keep
|
|
|
|
# track of these to write an error message with them later.
|
|
|
|
self.peers_with_shares = set()
|
2007-09-16 08:24:07 +00:00
|
|
|
|
|
|
|
# this needed_hashes computation should mirror
|
|
|
|
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
|
|
|
|
# (instead of a HashTree) because we don't require actual hashing
|
|
|
|
# just to count the levels.
|
|
|
|
ht = hashtree.IncompleteHashTree(total_shares)
|
|
|
|
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
|
|
|
|
|
2008-11-22 03:28:12 +00:00
|
|
|
# figure out how much space to ask for
|
2009-01-13 03:14:42 +00:00
|
|
|
wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
|
|
|
|
num_share_hashes, EXTENSION_SIZE,
|
|
|
|
None)
|
|
|
|
allocated_size = wbp.get_allocated_size()
|
2010-05-14 00:49:17 +00:00
|
|
|
all_peers = storage_broker.get_servers_for_index(storage_index)
|
|
|
|
if not all_peers:
|
|
|
|
raise NoServersError("client gave us zero peers")
|
2009-01-13 03:14:42 +00:00
|
|
|
|
2008-11-22 03:28:12 +00:00
|
|
|
# filter the list of peers according to which ones can accomodate
|
|
|
|
# this request. This excludes older peers (which used a 4-byte size
|
|
|
|
# field) from getting large shares (for files larger than about
|
|
|
|
# 12GiB). See #439 for details.
|
|
|
|
def _get_maxsize(peer):
|
|
|
|
(peerid, conn) = peer
|
|
|
|
v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
|
|
|
|
return v1["maximum-immutable-share-size"]
|
2010-05-14 00:49:17 +00:00
|
|
|
writable_peers = [peer for peer in all_peers
|
|
|
|
if _get_maxsize(peer) >= allocated_size]
|
|
|
|
readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
|
2008-11-22 03:28:12 +00:00
|
|
|
|
2007-09-16 08:24:07 +00:00
|
|
|
# decide upon the renewal/cancel secrets, to include them in the
|
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
* stop using IURI as an adapter
* pass cap strings around instead of URI instances
* move filenode/dirnode creation duties from Client to new NodeMaker class
* move other Client duties to KeyGenerator, SecretHolder, History classes
* stop passing Client reference to dirnode/filenode constructors
- pass less-powerful references instead, like StorageBroker or Uploader
* always create DirectoryNodes by wrapping a filenode (mutable for now)
* remove some specialized mock classes from unit tests
Detailed list of changes (done one at a time, then merged together)
always pass a string to create_node_from_uri(), not an IURI instance
always pass a string to IFilesystemNode constructors, not an IURI instance
stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri()
client.py: move SecretHolder code out to a separate class
test_web.py: hush pyflakes
client.py: move NodeMaker functionality out into a separate object
LiteralFileNode: stop storing a Client reference
immutable Checker: remove Client reference, it only needs a SecretHolder
immutable Upload: remove Client reference, leave SecretHolder and StorageBroker
immutable Repairer: replace Client reference with StorageBroker and SecretHolder
immutable FileNode: remove Client reference
mutable.Publish: stop passing Client
mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference
MutableChecker: reference StorageBroker and History directly, not through Client
mutable.FileNode: removed unused indirection to checker classes
mutable.FileNode: remove Client reference
client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker
move create_mutable_file() into NodeMaker
test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests.
test_mutable.py: clean up basedir names
client.py: move create_empty_dirnode() into NodeMaker
dirnode.py: get rid of DirectoryNode.create
remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match
stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker
remove Client from NodeMaker
move helper status into History, pass History to web.Status instead of Client
test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
|
|
|
# allocate_buckets query.
|
|
|
|
client_renewal_secret = secret_holder.get_renewal_secret()
|
|
|
|
client_cancel_secret = secret_holder.get_cancel_secret()
|
2007-09-16 08:24:07 +00:00
|
|
|
|
|
|
|
file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
|
|
|
|
storage_index)
|
|
|
|
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
|
|
|
|
storage_index)
|
2009-11-16 20:28:05 +00:00
|
|
|
def _make_trackers(peers):
|
2010-05-14 00:49:17 +00:00
|
|
|
return [PeerTracker(peerid, conn,
|
|
|
|
share_size, block_size,
|
|
|
|
num_segments, num_share_hashes,
|
|
|
|
storage_index,
|
|
|
|
bucket_renewal_secret_hash(file_renewal_secret,
|
|
|
|
peerid),
|
|
|
|
bucket_cancel_secret_hash(file_cancel_secret,
|
|
|
|
peerid))
|
2009-11-16 20:28:05 +00:00
|
|
|
for (peerid, conn) in peers]
|
2010-05-14 00:49:17 +00:00
|
|
|
self.uncontacted_peers = _make_trackers(writable_peers)
|
|
|
|
self.readonly_peers = _make_trackers(readonly_peers)
|
|
|
|
# We now ask peers that can't hold any new shares about existing
|
|
|
|
# shares that they might have for our SI. Once this is done, we
|
|
|
|
# start placing the shares that we haven't already accounted
|
|
|
|
# for.
|
|
|
|
ds = []
|
|
|
|
if self._status and self.readonly_peers:
|
|
|
|
self._status.set_status("Contacting readonly peers to find "
|
|
|
|
"any existing shares")
|
|
|
|
for peer in self.readonly_peers:
|
2009-11-16 20:28:05 +00:00
|
|
|
assert isinstance(peer, PeerTracker)
|
2010-05-14 00:49:17 +00:00
|
|
|
d = peer.ask_about_existing_shares()
|
2009-11-23 01:24:05 +00:00
|
|
|
d.addBoth(self._handle_existing_response, peer.peerid)
|
2010-05-14 00:49:17 +00:00
|
|
|
ds.append(d)
|
2009-11-23 01:24:05 +00:00
|
|
|
self.num_peers_contacted += 1
|
|
|
|
self.query_count += 1
|
2010-05-14 00:49:17 +00:00
|
|
|
log.msg("asking peer %s for any existing shares for "
|
|
|
|
"upload id %s"
|
2009-11-23 01:24:05 +00:00
|
|
|
% (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
|
|
|
|
level=log.NOISY, parent=self._log_parent)
|
2010-05-14 00:49:17 +00:00
|
|
|
dl = defer.DeferredList(ds)
|
|
|
|
dl.addCallback(lambda ign: self._loop())
|
|
|
|
return dl
|
|
|
|
|
2009-11-16 20:28:05 +00:00
|
|
|
|
2009-11-23 01:24:05 +00:00
|
|
|
def _handle_existing_response(self, res, peer):
|
2010-05-14 00:49:17 +00:00
|
|
|
"""
|
|
|
|
I handle responses to the queries sent by
|
|
|
|
Tahoe2PeerSelector._existing_shares.
|
|
|
|
"""
|
2009-11-23 01:24:05 +00:00
|
|
|
if isinstance(res, failure.Failure):
|
|
|
|
log.msg("%s got error during existing shares check: %s"
|
|
|
|
% (idlib.shortnodeid_b2a(peer), res),
|
|
|
|
level=log.UNUSUAL, parent=self._log_parent)
|
|
|
|
self.error_count += 1
|
|
|
|
self.bad_query_count += 1
|
|
|
|
else:
|
|
|
|
buckets = res
|
2009-12-30 22:03:44 +00:00
|
|
|
if buckets:
|
2010-05-14 00:49:17 +00:00
|
|
|
self.peers_with_shares.add(peer)
|
2009-11-23 01:24:05 +00:00
|
|
|
log.msg("response from peer %s: alreadygot=%s"
|
|
|
|
% (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
|
|
|
|
level=log.NOISY, parent=self._log_parent)
|
|
|
|
for bucket in buckets:
|
2010-05-14 00:49:17 +00:00
|
|
|
self.preexisting_shares.setdefault(bucket, set()).add(peer)
|
|
|
|
if self.homeless_shares and bucket in self.homeless_shares:
|
|
|
|
self.homeless_shares.remove(bucket)
|
2009-11-23 01:24:05 +00:00
|
|
|
self.full_count += 1
|
|
|
|
self.bad_query_count += 1
|
2010-05-14 00:49:17 +00:00
|
|
|
|
2009-11-04 12:12:22 +00:00
|
|
|
|
2009-12-30 22:03:44 +00:00
|
|
|
def _get_progress_message(self):
|
|
|
|
if not self.homeless_shares:
|
|
|
|
msg = "placed all %d shares, " % (self.total_shares)
|
|
|
|
else:
|
|
|
|
msg = ("placed %d shares out of %d total (%d homeless), " %
|
|
|
|
(self.total_shares - len(self.homeless_shares),
|
|
|
|
self.total_shares,
|
|
|
|
len(self.homeless_shares)))
|
|
|
|
return (msg + "want to place shares on at least %d servers such that "
|
|
|
|
"any %d of them have enough shares to recover the file, "
|
|
|
|
"sent %d queries to %d peers, "
|
|
|
|
"%d queries placed some shares, %d placed none "
|
|
|
|
"(of which %d placed none due to the server being"
|
|
|
|
" full and %d placed none due to an error)" %
|
|
|
|
(self.servers_of_happiness, self.needed_shares,
|
|
|
|
self.query_count, self.num_peers_contacted,
|
|
|
|
self.good_query_count, self.bad_query_count,
|
|
|
|
self.full_count, self.error_count))
|
|
|
|
|
|
|
|
|
2007-09-16 08:24:07 +00:00
|
|
|
def _loop(self):
|
|
|
|
if not self.homeless_shares:
|
2010-05-14 00:49:17 +00:00
|
|
|
merged = merge_peers(self.preexisting_shares, self.use_peers)
|
|
|
|
effective_happiness = servers_of_happiness(merged)
|
|
|
|
if self.servers_of_happiness <= effective_happiness:
|
2009-12-30 22:03:44 +00:00
|
|
|
msg = ("peer selection successful for %s: %s" % (self,
|
|
|
|
self._get_progress_message()))
|
|
|
|
log.msg(msg, parent=self._log_parent)
|
2009-11-04 12:12:22 +00:00
|
|
|
return (self.use_peers, self.preexisting_shares)
|
|
|
|
else:
|
2010-05-14 00:49:17 +00:00
|
|
|
# We're not okay right now, but maybe we can fix it by
|
|
|
|
# redistributing some shares. In cases where one or two
|
|
|
|
# servers has, before the upload, all or most of the
|
|
|
|
# shares for a given SI, this can work by allowing _loop
|
|
|
|
# a chance to spread those out over the other peers,
|
|
|
|
delta = self.servers_of_happiness - effective_happiness
|
2009-11-04 12:12:22 +00:00
|
|
|
shares = shares_by_server(self.preexisting_shares)
|
|
|
|
# Each server in shares maps to a set of shares stored on it.
|
|
|
|
# Since we want to keep at least one share on each server
|
|
|
|
# that has one (otherwise we'd only be making
|
|
|
|
# the situation worse by removing distinct servers),
|
|
|
|
# each server has len(its shares) - 1 to spread around.
|
|
|
|
shares_to_spread = sum([len(list(sharelist)) - 1
|
|
|
|
for (server, sharelist)
|
|
|
|
in shares.items()])
|
|
|
|
if delta <= len(self.uncontacted_peers) and \
|
|
|
|
shares_to_spread >= delta:
|
|
|
|
items = shares.items()
|
|
|
|
while len(self.homeless_shares) < delta:
|
2010-05-14 00:49:17 +00:00
|
|
|
# Loop through the allocated shares, removing
|
|
|
|
# one from each server that has more than one
|
|
|
|
# and putting it back into self.homeless_shares
|
|
|
|
# until we've done this delta times.
|
|
|
|
server, sharelist = items.pop()
|
2009-11-04 12:12:22 +00:00
|
|
|
if len(sharelist) > 1:
|
|
|
|
share = sharelist.pop()
|
|
|
|
self.homeless_shares.append(share)
|
2010-05-14 00:49:17 +00:00
|
|
|
self.preexisting_shares[share].remove(server)
|
|
|
|
if not self.preexisting_shares[share]:
|
|
|
|
del self.preexisting_shares[share]
|
|
|
|
items.append((server, sharelist))
|
2010-07-19 04:46:55 +00:00
|
|
|
for writer in self.use_peers:
|
|
|
|
writer.abort_some_buckets(self.homeless_shares)
|
2009-11-04 12:12:22 +00:00
|
|
|
return self._loop()
|
|
|
|
else:
|
2010-05-14 00:49:17 +00:00
|
|
|
# Redistribution won't help us; fail.
|
|
|
|
peer_count = len(self.peers_with_shares)
|
|
|
|
msg = failure_message(peer_count,
|
|
|
|
self.needed_shares,
|
|
|
|
self.servers_of_happiness,
|
|
|
|
effective_happiness)
|
2010-07-19 04:46:55 +00:00
|
|
|
log.msg("server selection unsuccessful for %r: %s (%s), merged=%r"
|
|
|
|
% (self, msg, self._get_progress_message(), merged), level=log.INFREQUENT)
|
2010-07-15 23:17:14 +00:00
|
|
|
return self._failed("%s (%s)" % (msg, self._get_progress_message()))
|
2007-09-16 08:24:07 +00:00
|
|
|
|
|
|
|
if self.uncontacted_peers:
|
|
|
|
peer = self.uncontacted_peers.pop(0)
|
|
|
|
# TODO: don't pre-convert all peerids to PeerTrackers
|
|
|
|
assert isinstance(peer, PeerTracker)
|
|
|
|
|
|
|
|
shares_to_ask = set([self.homeless_shares.pop(0)])
|
|
|
|
self.query_count += 1
|
|
|
|
self.num_peers_contacted += 1
|
2008-02-12 22:36:05 +00:00
|
|
|
if self._status:
|
|
|
|
self._status.set_status("Contacting Peers [%s] (first query),"
|
|
|
|
" %d shares left.."
|
|
|
|
% (idlib.shortnodeid_b2a(peer.peerid),
|
|
|
|
len(self.homeless_shares)))
|
2007-09-16 08:24:07 +00:00
|
|
|
d = peer.query(shares_to_ask)
|
2007-09-17 00:08:34 +00:00
|
|
|
d.addBoth(self._got_response, peer, shares_to_ask,
|
|
|
|
self.contacted_peers)
|
2007-09-16 08:24:07 +00:00
|
|
|
return d
|
2007-09-17 00:08:34 +00:00
|
|
|
elif self.contacted_peers:
|
2007-09-16 08:24:07 +00:00
|
|
|
# ask a peer that we've already asked.
|
2008-01-15 04:19:20 +00:00
|
|
|
if not self._started_second_pass:
|
|
|
|
log.msg("starting second pass", parent=self._log_parent,
|
|
|
|
level=log.NOISY)
|
|
|
|
self._started_second_pass = True
|
2007-09-17 00:08:34 +00:00
|
|
|
num_shares = mathutil.div_ceil(len(self.homeless_shares),
|
|
|
|
len(self.contacted_peers))
|
2007-09-16 08:24:07 +00:00
|
|
|
peer = self.contacted_peers.pop(0)
|
2007-09-17 00:08:34 +00:00
|
|
|
shares_to_ask = set(self.homeless_shares[:num_shares])
|
|
|
|
self.homeless_shares[:num_shares] = []
|
2007-09-16 08:24:07 +00:00
|
|
|
self.query_count += 1
|
2008-02-12 22:36:05 +00:00
|
|
|
if self._status:
|
|
|
|
self._status.set_status("Contacting Peers [%s] (second query),"
|
|
|
|
" %d shares left.."
|
|
|
|
% (idlib.shortnodeid_b2a(peer.peerid),
|
|
|
|
len(self.homeless_shares)))
|
2007-09-16 08:24:07 +00:00
|
|
|
d = peer.query(shares_to_ask)
|
2007-09-17 00:08:34 +00:00
|
|
|
d.addBoth(self._got_response, peer, shares_to_ask,
|
|
|
|
self.contacted_peers2)
|
2007-09-16 08:24:07 +00:00
|
|
|
return d
|
2007-09-17 00:08:34 +00:00
|
|
|
elif self.contacted_peers2:
|
|
|
|
# we've finished the second-or-later pass. Move all the remaining
|
|
|
|
# peers back into self.contacted_peers for the next pass.
|
|
|
|
self.contacted_peers.extend(self.contacted_peers2)
|
2009-07-16 23:01:20 +00:00
|
|
|
self.contacted_peers2[:] = []
|
2007-09-17 00:08:34 +00:00
|
|
|
return self._loop()
|
2007-09-16 08:24:07 +00:00
|
|
|
else:
|
|
|
|
# no more peers. If we haven't placed enough shares, we fail.
|
2010-05-14 00:49:17 +00:00
|
|
|
merged = merge_peers(self.preexisting_shares, self.use_peers)
|
|
|
|
effective_happiness = servers_of_happiness(merged)
|
|
|
|
if effective_happiness < self.servers_of_happiness:
|
|
|
|
msg = failure_message(len(self.peers_with_shares),
|
|
|
|
self.needed_shares,
|
|
|
|
self.servers_of_happiness,
|
|
|
|
effective_happiness)
|
|
|
|
msg = ("peer selection failed for %s: %s (%s)" % (self,
|
|
|
|
msg,
|
2009-12-30 22:03:44 +00:00
|
|
|
self._get_progress_message()))
|
2007-09-16 08:24:07 +00:00
|
|
|
if self.last_failure_msg:
|
|
|
|
msg += " (%s)" % (self.last_failure_msg,)
|
2008-01-15 04:19:20 +00:00
|
|
|
log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
|
2010-07-15 23:17:14 +00:00
|
|
|
return self._failed(msg)
|
2007-09-16 08:24:07 +00:00
|
|
|
else:
|
|
|
|
# we placed enough to be happy, so we're done
|
2008-02-12 22:36:05 +00:00
|
|
|
if self._status:
|
|
|
|
self._status.set_status("Placed all shares")
|
2009-11-18 02:45:42 +00:00
|
|
|
return (self.use_peers, self.preexisting_shares)
|
2007-09-16 08:24:07 +00:00
|
|
|
|
2007-09-17 00:08:34 +00:00
|
|
|
def _got_response(self, res, peer, shares_to_ask, put_peer_here):
|
2007-09-16 08:24:07 +00:00
|
|
|
if isinstance(res, failure.Failure):
|
|
|
|
# This is unusual, and probably indicates a bug or a network
|
|
|
|
# problem.
|
2008-01-15 04:19:20 +00:00
|
|
|
log.msg("%s got error during peer selection: %s" % (peer, res),
|
|
|
|
level=log.UNUSUAL, parent=self._log_parent)
|
2007-09-16 08:24:07 +00:00
|
|
|
self.error_count += 1
|
2009-11-23 01:24:05 +00:00
|
|
|
self.bad_query_count += 1
|
2007-09-16 08:24:07 +00:00
|
|
|
self.homeless_shares = list(shares_to_ask) + self.homeless_shares
|
2007-09-17 00:08:34 +00:00
|
|
|
if (self.uncontacted_peers
|
|
|
|
or self.contacted_peers
|
|
|
|
or self.contacted_peers2):
|
2007-09-16 08:24:07 +00:00
|
|
|
# there is still hope, so just loop
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
# No more peers, so this upload might fail (it depends upon
|
2009-11-16 22:24:59 +00:00
|
|
|
# whether we've hit servers_of_happiness or not). Log the last
|
2007-09-16 08:24:07 +00:00
|
|
|
# failure we got: if a coding error causes all peers to fail
|
|
|
|
# in the same way, this allows the common failure to be seen
|
|
|
|
# by the uploader and should help with debugging
|
|
|
|
msg = ("last failure (from %s) was: %s" % (peer, res))
|
|
|
|
self.last_failure_msg = msg
|
|
|
|
else:
|
|
|
|
(alreadygot, allocated) = res
|
2008-01-15 04:19:20 +00:00
|
|
|
log.msg("response from peer %s: alreadygot=%s, allocated=%s"
|
|
|
|
% (idlib.shortnodeid_b2a(peer.peerid),
|
|
|
|
tuple(sorted(alreadygot)), tuple(sorted(allocated))),
|
|
|
|
level=log.NOISY, parent=self._log_parent)
|
2007-09-16 08:24:07 +00:00
|
|
|
progress = False
|
|
|
|
for s in alreadygot:
|
2010-05-14 00:49:17 +00:00
|
|
|
self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
|
|
|
|
if s in self.homeless_shares:
|
|
|
|
self.homeless_shares.remove(s)
|
|
|
|
progress = True
|
|
|
|
elif s in shares_to_ask:
|
|
|
|
progress = True
|
2007-09-16 08:24:07 +00:00
|
|
|
|
|
|
|
# the PeerTracker will remember which shares were allocated on
|
|
|
|
# that peer. We just have to remember to use them.
|
|
|
|
if allocated:
|
|
|
|
self.use_peers.add(peer)
|
|
|
|
progress = True
|
|
|
|
|
2009-12-30 22:03:44 +00:00
|
|
|
if allocated or alreadygot:
|
2010-05-14 00:49:17 +00:00
|
|
|
self.peers_with_shares.add(peer.peerid)
|
2009-12-30 22:03:44 +00:00
|
|
|
|
2007-09-16 08:24:07 +00:00
|
|
|
not_yet_present = set(shares_to_ask) - set(alreadygot)
|
|
|
|
still_homeless = not_yet_present - set(allocated)
|
|
|
|
|
|
|
|
if progress:
|
2010-05-14 00:49:17 +00:00
|
|
|
# They accepted at least one of the shares that we asked
|
|
|
|
# them to accept, or they had a share that we didn't ask
|
|
|
|
# them to accept but that we hadn't placed yet, so this
|
|
|
|
# was a productive query
|
2007-09-16 08:24:07 +00:00
|
|
|
self.good_query_count += 1
|
|
|
|
else:
|
|
|
|
self.bad_query_count += 1
|
2009-11-23 01:24:05 +00:00
|
|
|
self.full_count += 1
|
2007-09-16 08:24:07 +00:00
|
|
|
|
|
|
|
if still_homeless:
|
|
|
|
# In networks with lots of space, this is very unusual and
|
|
|
|
# probably indicates an error. In networks with peers that
|
|
|
|
# are full, it is merely unusual. In networks that are very
|
|
|
|
# full, it is common, and many uploads will fail. In most
|
|
|
|
# cases, this is obviously not fatal, and we'll just use some
|
|
|
|
# other peers.
|
|
|
|
|
|
|
|
# some shares are still homeless, keep trying to find them a
|
|
|
|
# home. The ones that were rejected get first priority.
|
|
|
|
self.homeless_shares = (list(still_homeless)
|
|
|
|
+ self.homeless_shares)
|
|
|
|
# Since they were unable to accept all of our requests, so it
|
|
|
|
# is safe to assume that asking them again won't help.
|
|
|
|
else:
|
|
|
|
# if they *were* able to accept everything, they might be
|
|
|
|
# willing to accept even more.
|
2007-09-17 00:08:34 +00:00
|
|
|
put_peer_here.append(peer)
|
2007-09-16 08:24:07 +00:00
|
|
|
|
|
|
|
# now loop
|
|
|
|
return self._loop()
|
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
|
2010-07-15 23:17:14 +00:00
|
|
|
def _failed(self, msg):
|
|
|
|
"""
|
|
|
|
I am called when peer selection fails. I first abort all of the
|
|
|
|
remote buckets that I allocated during my unsuccessful attempt to
|
|
|
|
place shares for this file. I then raise an
|
|
|
|
UploadUnhappinessError with my msg argument.
|
|
|
|
"""
|
|
|
|
for peer in self.use_peers:
|
|
|
|
assert isinstance(peer, PeerTracker)
|
|
|
|
|
|
|
|
peer.abort()
|
|
|
|
|
|
|
|
raise UploadUnhappinessError(msg)
|
|
|
|
|
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
class EncryptAnUploadable:
|
|
|
|
"""This is a wrapper that takes an IUploadable and provides
|
|
|
|
IEncryptedUploadable."""
|
|
|
|
implements(IEncryptedUploadable)
|
2008-02-08 00:10:13 +00:00
|
|
|
CHUNKSIZE = 50*1024
|
2007-07-24 02:31:53 +00:00
|
|
|
|
2008-03-01 03:00:45 +00:00
|
|
|
def __init__(self, original, log_parent=None):
|
2008-01-25 00:25:33 +00:00
|
|
|
self.original = IUploadable(original)
|
2008-03-01 03:00:45 +00:00
|
|
|
self._log_number = log_parent
|
2007-07-24 02:31:53 +00:00
|
|
|
self._encryptor = None
|
2007-08-28 02:00:18 +00:00
|
|
|
self._plaintext_hasher = plaintext_hasher()
|
2007-07-24 02:31:53 +00:00
|
|
|
self._plaintext_segment_hasher = None
|
|
|
|
self._plaintext_segment_hashes = []
|
2008-01-16 10:03:35 +00:00
|
|
|
self._encoding_parameters = None
|
|
|
|
self._file_size = None
|
2008-02-12 22:36:05 +00:00
|
|
|
self._ciphertext_bytes_read = 0
|
|
|
|
self._status = None
|
|
|
|
|
|
|
|
def set_upload_status(self, upload_status):
|
|
|
|
self._status = IUploadStatus(upload_status)
|
|
|
|
self.original.set_upload_status(upload_status)
|
2007-07-24 02:31:53 +00:00
|
|
|
|
2008-01-17 08:11:35 +00:00
|
|
|
def log(self, *args, **kwargs):
|
|
|
|
if "facility" not in kwargs:
|
|
|
|
kwargs["facility"] = "upload.encryption"
|
2008-03-01 03:00:45 +00:00
|
|
|
if "parent" not in kwargs:
|
|
|
|
kwargs["parent"] = self._log_number
|
2008-01-17 08:11:35 +00:00
|
|
|
return log.msg(*args, **kwargs)
|
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
def get_size(self):
|
2008-01-16 10:03:35 +00:00
|
|
|
if self._file_size is not None:
|
|
|
|
return defer.succeed(self._file_size)
|
|
|
|
d = self.original.get_size()
|
|
|
|
def _got_size(size):
|
|
|
|
self._file_size = size
|
2008-02-12 22:36:05 +00:00
|
|
|
if self._status:
|
|
|
|
self._status.set_size(size)
|
2008-01-16 10:03:35 +00:00
|
|
|
return size
|
|
|
|
d.addCallback(_got_size)
|
|
|
|
return d
|
2007-07-24 02:31:53 +00:00
|
|
|
|
2008-01-16 10:03:35 +00:00
|
|
|
def get_all_encoding_parameters(self):
|
|
|
|
if self._encoding_parameters is not None:
|
|
|
|
return defer.succeed(self._encoding_parameters)
|
2008-02-07 01:39:03 +00:00
|
|
|
d = self.original.get_all_encoding_parameters()
|
|
|
|
def _got(encoding_parameters):
|
|
|
|
(k, happy, n, segsize) = encoding_parameters
|
2008-01-16 10:03:35 +00:00
|
|
|
self._segment_size = segsize # used by segment hashers
|
2008-02-07 01:39:03 +00:00
|
|
|
self._encoding_parameters = encoding_parameters
|
|
|
|
self.log("my encoding parameters: %s" % (encoding_parameters,),
|
|
|
|
level=log.NOISY)
|
|
|
|
return encoding_parameters
|
|
|
|
d.addCallback(_got)
|
2008-01-16 10:03:35 +00:00
|
|
|
return d
|
2007-07-24 02:31:53 +00:00
|
|
|
|
|
|
|
def _get_encryptor(self):
|
|
|
|
if self._encryptor:
|
|
|
|
return defer.succeed(self._encryptor)
|
|
|
|
|
|
|
|
d = self.original.get_encryption_key()
|
|
|
|
def _got(key):
|
2007-12-04 00:27:46 +00:00
|
|
|
e = AES(key)
|
2007-07-24 02:31:53 +00:00
|
|
|
self._encryptor = e
|
|
|
|
|
2008-02-01 19:27:37 +00:00
|
|
|
storage_index = storage_index_hash(key)
|
2007-07-24 02:31:53 +00:00
|
|
|
assert isinstance(storage_index, str)
|
|
|
|
# There's no point to having the SI be longer than the key, so we
|
|
|
|
# specify that it is truncated to the same 128 bits as the AES key.
|
|
|
|
assert len(storage_index) == 16 # SHA-256 truncated to 128b
|
|
|
|
self._storage_index = storage_index
|
2008-02-12 22:36:05 +00:00
|
|
|
if self._status:
|
|
|
|
self._status.set_storage_index(storage_index)
|
2007-07-24 02:31:53 +00:00
|
|
|
return e
|
|
|
|
d.addCallback(_got)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def get_storage_index(self):
|
|
|
|
d = self._get_encryptor()
|
|
|
|
d.addCallback(lambda res: self._storage_index)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def _get_segment_hasher(self):
|
|
|
|
p = self._plaintext_segment_hasher
|
|
|
|
if p:
|
|
|
|
left = self._segment_size - self._plaintext_segment_hashed_bytes
|
|
|
|
return p, left
|
2007-08-28 02:00:18 +00:00
|
|
|
p = plaintext_segment_hasher()
|
2007-07-24 02:31:53 +00:00
|
|
|
self._plaintext_segment_hasher = p
|
|
|
|
self._plaintext_segment_hashed_bytes = 0
|
|
|
|
return p, self._segment_size
|
|
|
|
|
|
|
|
def _update_segment_hash(self, chunk):
|
|
|
|
offset = 0
|
|
|
|
while offset < len(chunk):
|
|
|
|
p, segment_left = self._get_segment_hasher()
|
|
|
|
chunk_left = len(chunk) - offset
|
|
|
|
this_segment = min(chunk_left, segment_left)
|
|
|
|
p.update(chunk[offset:offset+this_segment])
|
|
|
|
self._plaintext_segment_hashed_bytes += this_segment
|
|
|
|
|
|
|
|
if self._plaintext_segment_hashed_bytes == self._segment_size:
|
|
|
|
# we've filled this segment
|
|
|
|
self._plaintext_segment_hashes.append(p.digest())
|
|
|
|
self._plaintext_segment_hasher = None
|
2008-01-17 08:11:35 +00:00
|
|
|
self.log("closed hash [%d]: %dB" %
|
|
|
|
(len(self._plaintext_segment_hashes)-1,
|
|
|
|
self._plaintext_segment_hashed_bytes),
|
|
|
|
level=log.NOISY)
|
|
|
|
self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
|
|
|
|
segnum=len(self._plaintext_segment_hashes)-1,
|
2008-02-15 02:27:47 +00:00
|
|
|
hash=base32.b2a(p.digest()),
|
2008-01-17 08:11:35 +00:00
|
|
|
level=log.NOISY)
|
2007-07-24 02:31:53 +00:00
|
|
|
|
|
|
|
offset += this_segment
|
|
|
|
|
2008-01-25 04:51:34 +00:00
|
|
|
|
2008-01-25 00:25:33 +00:00
|
|
|
def read_encrypted(self, length, hash_only):
|
2008-01-16 10:03:35 +00:00
|
|
|
# make sure our parameters have been set up first
|
|
|
|
d = self.get_all_encoding_parameters()
|
2008-02-12 22:36:05 +00:00
|
|
|
# and size
|
|
|
|
d.addCallback(lambda ignored: self.get_size())
|
2008-01-16 10:03:35 +00:00
|
|
|
d.addCallback(lambda ignored: self._get_encryptor())
|
2008-01-25 04:51:34 +00:00
|
|
|
# then fetch and encrypt the plaintext. The unusual structure here
|
|
|
|
# (passing a Deferred *into* a function) is needed to avoid
|
|
|
|
# overflowing the stack: Deferreds don't optimize out tail recursion.
|
|
|
|
# We also pass in a list, to which _read_encrypted will append
|
|
|
|
# ciphertext.
|
2008-01-25 00:25:33 +00:00
|
|
|
ciphertext = []
|
2008-01-25 04:51:34 +00:00
|
|
|
d2 = defer.Deferred()
|
|
|
|
d.addCallback(lambda ignored:
|
|
|
|
self._read_encrypted(length, ciphertext, hash_only, d2))
|
|
|
|
d.addCallback(lambda ignored: d2)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
|
|
|
|
if not remaining:
|
|
|
|
fire_when_done.callback(ciphertext)
|
|
|
|
return None
|
|
|
|
# tolerate large length= values without consuming a lot of RAM by
|
|
|
|
# reading just a chunk (say 50kB) at a time. This only really matters
|
|
|
|
# when hash_only==True (i.e. resuming an interrupted upload), since
|
|
|
|
# that's the case where we will be skipping over a lot of data.
|
|
|
|
size = min(remaining, self.CHUNKSIZE)
|
|
|
|
remaining = remaining - size
|
|
|
|
# read a chunk of plaintext..
|
|
|
|
d = defer.maybeDeferred(self.original.read, size)
|
|
|
|
# N.B.: if read() is synchronous, then since everything else is
|
|
|
|
# actually synchronous too, we'd blow the stack unless we stall for a
|
|
|
|
# tick. Once you accept a Deferred from IUploadable.read(), you must
|
|
|
|
# be prepared to have it fire immediately too.
|
2009-05-22 00:38:23 +00:00
|
|
|
d.addCallback(fireEventually)
|
2008-01-25 04:51:34 +00:00
|
|
|
def _good(plaintext):
|
2008-01-25 00:25:33 +00:00
|
|
|
# and encrypt it..
|
|
|
|
# o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
|
2008-01-25 04:51:34 +00:00
|
|
|
ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
|
|
|
|
ciphertext.extend(ct)
|
|
|
|
self._read_encrypted(remaining, ciphertext, hash_only,
|
|
|
|
fire_when_done)
|
|
|
|
def _err(why):
|
|
|
|
fire_when_done.errback(why)
|
|
|
|
d.addCallback(_good)
|
|
|
|
d.addErrback(_err)
|
|
|
|
return None
|
2007-07-24 02:31:53 +00:00
|
|
|
|
2008-01-25 00:25:33 +00:00
|
|
|
def _hash_and_encrypt_plaintext(self, data, hash_only):
|
|
|
|
assert isinstance(data, (tuple, list)), type(data)
|
|
|
|
data = list(data)
|
|
|
|
cryptdata = []
|
|
|
|
# we use data.pop(0) instead of 'for chunk in data' to save
|
|
|
|
# memory: each chunk is destroyed as soon as we're done with it.
|
2008-02-12 22:36:05 +00:00
|
|
|
bytes_processed = 0
|
2008-01-25 00:25:33 +00:00
|
|
|
while data:
|
|
|
|
chunk = data.pop(0)
|
2008-03-01 03:00:45 +00:00
|
|
|
self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
|
|
|
|
level=log.NOISY)
|
2008-02-12 22:36:05 +00:00
|
|
|
bytes_processed += len(chunk)
|
2008-01-25 00:25:33 +00:00
|
|
|
self._plaintext_hasher.update(chunk)
|
|
|
|
self._update_segment_hash(chunk)
|
|
|
|
# TODO: we have to encrypt the data (even if hash_only==True)
|
|
|
|
# because pycryptopp's AES-CTR implementation doesn't offer a
|
|
|
|
# way to change the counter value. Once pycryptopp acquires
|
|
|
|
# this ability, change this to simply update the counter
|
|
|
|
# before each call to (hash_only==False) _encryptor.process()
|
|
|
|
ciphertext = self._encryptor.process(chunk)
|
2008-01-29 01:38:38 +00:00
|
|
|
if hash_only:
|
2008-03-01 03:00:45 +00:00
|
|
|
self.log(" skipping encryption", level=log.NOISY)
|
2008-01-29 01:38:38 +00:00
|
|
|
else:
|
2008-01-25 00:25:33 +00:00
|
|
|
cryptdata.append(ciphertext)
|
|
|
|
del ciphertext
|
|
|
|
del chunk
|
2008-02-12 22:36:05 +00:00
|
|
|
self._ciphertext_bytes_read += bytes_processed
|
|
|
|
if self._status:
|
|
|
|
progress = float(self._ciphertext_bytes_read) / self._file_size
|
|
|
|
self._status.set_progress(1, progress)
|
2008-01-25 00:25:33 +00:00
|
|
|
return cryptdata
|
|
|
|
|
2008-01-25 04:51:34 +00:00
|
|
|
|
2008-01-10 00:58:47 +00:00
|
|
|
def get_plaintext_hashtree_leaves(self, first, last, num_segments):
|
2009-06-01 22:49:16 +00:00
|
|
|
# this is currently unused, but will live again when we fix #453
|
2007-07-24 02:31:53 +00:00
|
|
|
if len(self._plaintext_segment_hashes) < num_segments:
|
|
|
|
# close out the last one
|
|
|
|
assert len(self._plaintext_segment_hashes) == num_segments-1
|
|
|
|
p, segment_left = self._get_segment_hasher()
|
|
|
|
self._plaintext_segment_hashes.append(p.digest())
|
|
|
|
del self._plaintext_segment_hasher
|
2008-01-17 08:11:35 +00:00
|
|
|
self.log("closing plaintext leaf hasher, hashed %d bytes" %
|
|
|
|
self._plaintext_segment_hashed_bytes,
|
|
|
|
level=log.NOISY)
|
|
|
|
self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
|
|
|
|
segnum=len(self._plaintext_segment_hashes)-1,
|
2008-02-15 02:27:47 +00:00
|
|
|
hash=base32.b2a(p.digest()),
|
2008-01-17 08:11:35 +00:00
|
|
|
level=log.NOISY)
|
2007-07-24 02:31:53 +00:00
|
|
|
assert len(self._plaintext_segment_hashes) == num_segments
|
2008-01-10 00:58:47 +00:00
|
|
|
return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
|
2007-07-24 02:31:53 +00:00
|
|
|
|
|
|
|
def get_plaintext_hash(self):
|
|
|
|
h = self._plaintext_hasher.digest()
|
|
|
|
return defer.succeed(h)
|
|
|
|
|
2008-01-17 08:52:33 +00:00
|
|
|
def close(self):
|
|
|
|
return self.original.close()
|
|
|
|
|
2008-02-12 22:36:05 +00:00
|
|
|
class UploadStatus:
|
|
|
|
implements(IUploadStatus)
|
2008-03-01 05:19:03 +00:00
|
|
|
statusid_counter = itertools.count(0)
|
2008-02-12 22:36:05 +00:00
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
self.storage_index = None
|
|
|
|
self.size = None
|
|
|
|
self.helper = False
|
|
|
|
self.status = "Not started"
|
|
|
|
self.progress = [0.0, 0.0, 0.0]
|
2008-02-26 22:35:28 +00:00
|
|
|
self.active = True
|
2008-03-03 21:48:52 +00:00
|
|
|
self.results = None
|
2008-03-01 05:19:03 +00:00
|
|
|
self.counter = self.statusid_counter.next()
|
2008-03-05 01:50:44 +00:00
|
|
|
self.started = time.time()
|
2008-02-12 22:36:05 +00:00
|
|
|
|
2008-03-05 01:50:44 +00:00
|
|
|
def get_started(self):
|
|
|
|
return self.started
|
2008-02-12 22:36:05 +00:00
|
|
|
def get_storage_index(self):
|
|
|
|
return self.storage_index
|
|
|
|
def get_size(self):
|
|
|
|
return self.size
|
|
|
|
def using_helper(self):
|
|
|
|
return self.helper
|
|
|
|
def get_status(self):
|
|
|
|
return self.status
|
|
|
|
def get_progress(self):
|
|
|
|
return tuple(self.progress)
|
2008-02-26 22:35:28 +00:00
|
|
|
def get_active(self):
|
|
|
|
return self.active
|
2008-03-03 21:48:52 +00:00
|
|
|
def get_results(self):
|
|
|
|
return self.results
|
2008-03-01 05:19:03 +00:00
|
|
|
def get_counter(self):
|
|
|
|
return self.counter
|
2008-02-12 22:36:05 +00:00
|
|
|
|
|
|
|
def set_storage_index(self, si):
|
|
|
|
self.storage_index = si
|
|
|
|
def set_size(self, size):
|
|
|
|
self.size = size
|
|
|
|
def set_helper(self, helper):
|
|
|
|
self.helper = helper
|
|
|
|
def set_status(self, status):
|
|
|
|
self.status = status
|
|
|
|
def set_progress(self, which, value):
|
|
|
|
# [0]: chk, [1]: ciphertext, [2]: encode+push
|
|
|
|
self.progress[which] = value
|
2008-02-26 22:35:28 +00:00
|
|
|
def set_active(self, value):
|
|
|
|
self.active = value
|
2008-03-03 21:48:52 +00:00
|
|
|
def set_results(self, value):
|
|
|
|
self.results = value
|
2007-07-24 02:31:53 +00:00
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
class CHKUploader:
|
2007-09-16 08:25:03 +00:00
|
|
|
peer_selector_class = Tahoe2PeerSelector
|
2007-07-20 01:21:44 +00:00
|
|
|
|
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
* stop using IURI as an adapter
* pass cap strings around instead of URI instances
* move filenode/dirnode creation duties from Client to new NodeMaker class
* move other Client duties to KeyGenerator, SecretHolder, History classes
* stop passing Client reference to dirnode/filenode constructors
- pass less-powerful references instead, like StorageBroker or Uploader
* always create DirectoryNodes by wrapping a filenode (mutable for now)
* remove some specialized mock classes from unit tests
Detailed list of changes (done one at a time, then merged together)
always pass a string to create_node_from_uri(), not an IURI instance
always pass a string to IFilesystemNode constructors, not an IURI instance
stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri()
client.py: move SecretHolder code out to a separate class
test_web.py: hush pyflakes
client.py: move NodeMaker functionality out into a separate object
LiteralFileNode: stop storing a Client reference
immutable Checker: remove Client reference, it only needs a SecretHolder
immutable Upload: remove Client reference, leave SecretHolder and StorageBroker
immutable Repairer: replace Client reference with StorageBroker and SecretHolder
immutable FileNode: remove Client reference
mutable.Publish: stop passing Client
mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference
MutableChecker: reference StorageBroker and History directly, not through Client
mutable.FileNode: removed unused indirection to checker classes
mutable.FileNode: remove Client reference
client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker
move create_mutable_file() into NodeMaker
test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests.
test_mutable.py: clean up basedir names
client.py: move create_empty_dirnode() into NodeMaker
dirnode.py: get rid of DirectoryNode.create
remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match
stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker
remove Client from NodeMaker
move helper status into History, pass History to web.Status instead of Client
test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
|
|
|
def __init__(self, storage_broker, secret_holder):
|
|
|
|
# peer_selector needs storage_broker and secret_holder
|
|
|
|
self._storage_broker = storage_broker
|
|
|
|
self._secret_holder = secret_holder
|
|
|
|
self._log_number = self.log("CHKUploader starting", parent=None)
|
2008-01-15 04:22:55 +00:00
|
|
|
self._encoder = None
|
2008-02-06 07:41:51 +00:00
|
|
|
self._results = UploadResults()
|
2008-02-12 22:36:05 +00:00
|
|
|
self._storage_index = None
|
|
|
|
self._upload_status = UploadStatus()
|
|
|
|
self._upload_status.set_helper(False)
|
2008-02-26 22:35:28 +00:00
|
|
|
self._upload_status.set_active(True)
|
2008-03-03 21:48:52 +00:00
|
|
|
self._upload_status.set_results(self._results)
|
2007-07-20 01:21:44 +00:00
|
|
|
|
2009-01-10 18:46:23 +00:00
|
|
|
# locate_all_shareholders() will create the following attribute:
|
|
|
|
# self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
|
|
|
|
|
2008-01-15 04:19:20 +00:00
|
|
|
def log(self, *args, **kwargs):
|
|
|
|
if "parent" not in kwargs:
|
|
|
|
kwargs["parent"] = self._log_number
|
|
|
|
if "facility" not in kwargs:
|
|
|
|
kwargs["facility"] = "tahoe.upload"
|
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
* stop using IURI as an adapter
* pass cap strings around instead of URI instances
* move filenode/dirnode creation duties from Client to new NodeMaker class
* move other Client duties to KeyGenerator, SecretHolder, History classes
* stop passing Client reference to dirnode/filenode constructors
- pass less-powerful references instead, like StorageBroker or Uploader
* always create DirectoryNodes by wrapping a filenode (mutable for now)
* remove some specialized mock classes from unit tests
Detailed list of changes (done one at a time, then merged together)
always pass a string to create_node_from_uri(), not an IURI instance
always pass a string to IFilesystemNode constructors, not an IURI instance
stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri()
client.py: move SecretHolder code out to a separate class
test_web.py: hush pyflakes
client.py: move NodeMaker functionality out into a separate object
LiteralFileNode: stop storing a Client reference
immutable Checker: remove Client reference, it only needs a SecretHolder
immutable Upload: remove Client reference, leave SecretHolder and StorageBroker
immutable Repairer: replace Client reference with StorageBroker and SecretHolder
immutable FileNode: remove Client reference
mutable.Publish: stop passing Client
mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference
MutableChecker: reference StorageBroker and History directly, not through Client
mutable.FileNode: removed unused indirection to checker classes
mutable.FileNode: remove Client reference
client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker
move create_mutable_file() into NodeMaker
test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests.
test_mutable.py: clean up basedir names
client.py: move create_empty_dirnode() into NodeMaker
dirnode.py: get rid of DirectoryNode.create
remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match
stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker
remove Client from NodeMaker
move helper status into History, pass History to web.Status instead of Client
test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
|
|
|
return log.msg(*args, **kwargs)
|
2007-11-20 02:33:41 +00:00
|
|
|
|
2009-01-07 04:48:22 +00:00
|
|
|
def start(self, encrypted_uploadable):
|
2007-07-20 01:21:44 +00:00
|
|
|
"""Start uploading the file.
|
|
|
|
|
2009-01-07 04:48:22 +00:00
|
|
|
Returns a Deferred that will fire with the UploadResults instance.
|
|
|
|
"""
|
2007-07-20 01:21:44 +00:00
|
|
|
|
2008-02-06 07:41:51 +00:00
|
|
|
self._started = time.time()
|
2009-01-07 04:48:22 +00:00
|
|
|
eu = IEncryptedUploadable(encrypted_uploadable)
|
|
|
|
self.log("starting upload of %s" % eu)
|
2007-07-24 02:31:53 +00:00
|
|
|
|
2008-02-12 22:36:05 +00:00
|
|
|
eu.set_upload_status(self._upload_status)
|
2007-07-24 02:31:53 +00:00
|
|
|
d = self.start_encrypted(eu)
|
2009-01-07 04:48:22 +00:00
|
|
|
def _done(uploadresults):
|
2008-02-26 22:35:28 +00:00
|
|
|
self._upload_status.set_active(False)
|
2009-01-07 04:48:22 +00:00
|
|
|
return uploadresults
|
2008-02-26 22:35:28 +00:00
|
|
|
d.addBoth(_done)
|
2007-07-24 02:31:53 +00:00
|
|
|
return d
|
2007-07-20 01:21:44 +00:00
|
|
|
|
2008-01-15 04:22:55 +00:00
|
|
|
def abort(self):
|
2009-01-10 21:56:01 +00:00
|
|
|
"""Call this if the upload must be abandoned before it completes.
|
2008-01-15 04:22:55 +00:00
|
|
|
This will tell the shareholders to delete their partial shares. I
|
|
|
|
return a Deferred that fires when these messages have been acked."""
|
|
|
|
if not self._encoder:
|
|
|
|
# how did you call abort() before calling start() ?
|
|
|
|
return defer.succeed(None)
|
|
|
|
return self._encoder.abort()
|
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
def start_encrypted(self, encrypted):
|
2009-01-07 04:48:22 +00:00
|
|
|
""" Returns a Deferred that will fire with the UploadResults instance. """
|
2007-07-24 02:31:53 +00:00
|
|
|
eu = IEncryptedUploadable(encrypted)
|
|
|
|
|
2008-02-06 08:52:25 +00:00
|
|
|
started = time.time()
|
2008-02-12 22:36:05 +00:00
|
|
|
self._encoder = e = encode.Encoder(self._log_number,
|
|
|
|
self._upload_status)
|
2007-07-24 02:31:53 +00:00
|
|
|
d = e.set_encrypted_uploadable(eu)
|
2008-02-06 08:52:25 +00:00
|
|
|
d.addCallback(self.locate_all_shareholders, started)
|
2007-07-24 02:31:53 +00:00
|
|
|
d.addCallback(self.set_shareholders, e)
|
|
|
|
d.addCallback(lambda res: e.start())
|
2008-02-06 08:52:25 +00:00
|
|
|
d.addCallback(self._encrypted_done)
|
2007-07-20 01:21:44 +00:00
|
|
|
return d
|
|
|
|
|
2008-02-06 08:52:25 +00:00
|
|
|
def locate_all_shareholders(self, encoder, started):
|
|
|
|
peer_selection_started = now = time.time()
|
|
|
|
self._storage_index_elapsed = now - started
|
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
* stop using IURI as an adapter
* pass cap strings around instead of URI instances
* move filenode/dirnode creation duties from Client to new NodeMaker class
* move other Client duties to KeyGenerator, SecretHolder, History classes
* stop passing Client reference to dirnode/filenode constructors
- pass less-powerful references instead, like StorageBroker or Uploader
* always create DirectoryNodes by wrapping a filenode (mutable for now)
* remove some specialized mock classes from unit tests
Detailed list of changes (done one at a time, then merged together)
always pass a string to create_node_from_uri(), not an IURI instance
always pass a string to IFilesystemNode constructors, not an IURI instance
stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri()
client.py: move SecretHolder code out to a separate class
test_web.py: hush pyflakes
client.py: move NodeMaker functionality out into a separate object
LiteralFileNode: stop storing a Client reference
immutable Checker: remove Client reference, it only needs a SecretHolder
immutable Upload: remove Client reference, leave SecretHolder and StorageBroker
immutable Repairer: replace Client reference with StorageBroker and SecretHolder
immutable FileNode: remove Client reference
mutable.Publish: stop passing Client
mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference
MutableChecker: reference StorageBroker and History directly, not through Client
mutable.FileNode: removed unused indirection to checker classes
mutable.FileNode: remove Client reference
client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker
move create_mutable_file() into NodeMaker
test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests.
test_mutable.py: clean up basedir names
client.py: move create_empty_dirnode() into NodeMaker
dirnode.py: get rid of DirectoryNode.create
remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match
stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker
remove Client from NodeMaker
move helper status into History, pass History to web.Status instead of Client
test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
|
|
|
storage_broker = self._storage_broker
|
|
|
|
secret_holder = self._secret_holder
|
2007-07-24 02:31:53 +00:00
|
|
|
storage_index = encoder.get_param("storage_index")
|
2008-02-12 22:36:05 +00:00
|
|
|
self._storage_index = storage_index
|
2009-02-18 21:46:55 +00:00
|
|
|
upload_id = si_b2a(storage_index)[:5]
|
2007-11-20 02:33:41 +00:00
|
|
|
self.log("using storage index %s" % upload_id)
|
2008-02-12 22:36:05 +00:00
|
|
|
peer_selector = self.peer_selector_class(upload_id, self._log_number,
|
|
|
|
self._upload_status)
|
2007-09-16 08:24:07 +00:00
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
share_size = encoder.get_param("share_size")
|
|
|
|
block_size = encoder.get_param("block_size")
|
|
|
|
num_segments = encoder.get_param("num_segments")
|
|
|
|
k,desired,n = encoder.get_param("share_counts")
|
|
|
|
|
2008-02-06 07:41:51 +00:00
|
|
|
self._peer_selection_started = time.time()
|
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
* stop using IURI as an adapter
* pass cap strings around instead of URI instances
* move filenode/dirnode creation duties from Client to new NodeMaker class
* move other Client duties to KeyGenerator, SecretHolder, History classes
* stop passing Client reference to dirnode/filenode constructors
- pass less-powerful references instead, like StorageBroker or Uploader
* always create DirectoryNodes by wrapping a filenode (mutable for now)
* remove some specialized mock classes from unit tests
Detailed list of changes (done one at a time, then merged together)
always pass a string to create_node_from_uri(), not an IURI instance
always pass a string to IFilesystemNode constructors, not an IURI instance
stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri()
client.py: move SecretHolder code out to a separate class
test_web.py: hush pyflakes
client.py: move NodeMaker functionality out into a separate object
LiteralFileNode: stop storing a Client reference
immutable Checker: remove Client reference, it only needs a SecretHolder
immutable Upload: remove Client reference, leave SecretHolder and StorageBroker
immutable Repairer: replace Client reference with StorageBroker and SecretHolder
immutable FileNode: remove Client reference
mutable.Publish: stop passing Client
mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference
MutableChecker: reference StorageBroker and History directly, not through Client
mutable.FileNode: removed unused indirection to checker classes
mutable.FileNode: remove Client reference
client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker
move create_mutable_file() into NodeMaker
test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests.
test_mutable.py: clean up basedir names
client.py: move create_empty_dirnode() into NodeMaker
dirnode.py: get rid of DirectoryNode.create
remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match
stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker
remove Client from NodeMaker
move helper status into History, pass History to web.Status instead of Client
test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
|
|
|
d = peer_selector.get_shareholders(storage_broker, secret_holder,
|
|
|
|
storage_index,
|
2008-02-05 21:16:01 +00:00
|
|
|
share_size, block_size,
|
2009-12-30 22:03:44 +00:00
|
|
|
num_segments, n, k, desired)
|
2008-02-06 07:41:51 +00:00
|
|
|
def _done(res):
|
2008-02-06 08:52:25 +00:00
|
|
|
self._peer_selection_elapsed = time.time() - peer_selection_started
|
2008-02-06 07:41:51 +00:00
|
|
|
return res
|
|
|
|
d.addCallback(_done)
|
2007-07-20 01:21:44 +00:00
|
|
|
return d
|
|
|
|
|
2010-07-19 04:47:44 +00:00
|
|
|
def set_shareholders(self, (upload_servers, already_peers), encoder):
|
2007-03-30 21:54:33 +00:00
|
|
|
"""
|
2010-07-19 04:47:44 +00:00
|
|
|
@param upload_servers: a sequence of PeerTracker objects that have agreed to hold some shares for us (the shareids are stashed inside the PeerTracker)
|
2010-05-14 00:49:17 +00:00
|
|
|
@paran already_peers: a dict mapping sharenum to a set of peerids
|
|
|
|
that claim to already have this share
|
2007-03-30 21:54:33 +00:00
|
|
|
"""
|
2010-07-19 04:47:44 +00:00
|
|
|
self.log("_send_shares, upload_servers is %s" % (upload_servers,))
|
2008-03-06 02:51:51 +00:00
|
|
|
# record already-present shares in self._results
|
|
|
|
self._results.preexisting_shares = len(already_peers)
|
|
|
|
|
2009-01-10 18:46:23 +00:00
|
|
|
self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
|
2010-07-19 04:47:44 +00:00
|
|
|
for peer in upload_servers:
|
2007-03-30 21:54:33 +00:00
|
|
|
assert isinstance(peer, PeerTracker)
|
2007-03-30 03:19:52 +00:00
|
|
|
buckets = {}
|
2009-11-04 12:12:22 +00:00
|
|
|
servermap = already_peers.copy()
|
2010-07-19 04:47:44 +00:00
|
|
|
for peer in upload_servers:
|
2007-03-30 03:19:52 +00:00
|
|
|
buckets.update(peer.buckets)
|
2008-02-06 07:41:51 +00:00
|
|
|
for shnum in peer.buckets:
|
2009-01-10 18:46:23 +00:00
|
|
|
self._peer_trackers[shnum] = peer
|
2010-05-14 00:49:17 +00:00
|
|
|
servermap.setdefault(shnum, set()).add(peer.peerid)
|
2010-07-19 04:47:44 +00:00
|
|
|
assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), "%s (%s) != %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers])
|
2009-11-04 12:12:22 +00:00
|
|
|
encoder.set_shareholders(buckets, servermap)
|
2007-06-02 01:48:01 +00:00
|
|
|
|
2009-01-07 04:48:22 +00:00
|
|
|
def _encrypted_done(self, verifycap):
|
|
|
|
""" Returns a Deferred that will fire with the UploadResults instance. """
|
2008-02-06 07:41:51 +00:00
|
|
|
r = self._results
|
|
|
|
for shnum in self._encoder.get_shares_placed():
|
2009-01-10 18:46:23 +00:00
|
|
|
peer_tracker = self._peer_trackers[shnum]
|
2008-02-06 07:41:51 +00:00
|
|
|
peerid = peer_tracker.peerid
|
2009-01-12 18:00:22 +00:00
|
|
|
r.sharemap.add(shnum, peerid)
|
|
|
|
r.servermap.add(peerid, shnum)
|
2008-03-06 02:51:51 +00:00
|
|
|
r.pushed_shares = len(self._encoder.get_shares_placed())
|
2008-02-06 07:41:51 +00:00
|
|
|
now = time.time()
|
2008-02-06 08:52:25 +00:00
|
|
|
r.file_size = self._encoder.file_size
|
2008-02-06 07:41:51 +00:00
|
|
|
r.timings["total"] = now - self._started
|
2008-02-06 08:52:25 +00:00
|
|
|
r.timings["storage_index"] = self._storage_index_elapsed
|
|
|
|
r.timings["peer_selection"] = self._peer_selection_elapsed
|
2008-02-06 07:41:51 +00:00
|
|
|
r.timings.update(self._encoder.get_times())
|
2008-02-07 00:51:11 +00:00
|
|
|
r.uri_extension_data = self._encoder.get_uri_extension_data()
|
2009-01-07 04:48:22 +00:00
|
|
|
r.verifycapstr = verifycap.to_string()
|
2008-02-06 07:41:51 +00:00
|
|
|
return r
|
2006-12-01 09:54:28 +00:00
|
|
|
|
2008-02-12 22:36:05 +00:00
|
|
|
def get_upload_status(self):
|
|
|
|
return self._upload_status
|
2008-01-10 03:25:50 +00:00
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
def read_this_many_bytes(uploadable, size, prepend_data=[]):
|
2007-07-20 05:53:29 +00:00
|
|
|
if size == 0:
|
|
|
|
return defer.succeed([])
|
2007-07-20 01:21:44 +00:00
|
|
|
d = uploadable.read(size)
|
|
|
|
def _got(data):
|
2007-07-20 05:53:29 +00:00
|
|
|
assert isinstance(data, list)
|
2007-07-20 01:21:44 +00:00
|
|
|
bytes = sum([len(piece) for piece in data])
|
|
|
|
assert bytes > 0
|
|
|
|
assert bytes <= size
|
|
|
|
remaining = size - bytes
|
|
|
|
if remaining:
|
|
|
|
return read_this_many_bytes(uploadable, remaining,
|
|
|
|
prepend_data + data)
|
|
|
|
return prepend_data + data
|
|
|
|
d.addCallback(_got)
|
|
|
|
return d
|
|
|
|
|
2007-07-12 20:22:36 +00:00
|
|
|
class LiteralUploader:
|
|
|
|
|
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
* stop using IURI as an adapter
* pass cap strings around instead of URI instances
* move filenode/dirnode creation duties from Client to new NodeMaker class
* move other Client duties to KeyGenerator, SecretHolder, History classes
* stop passing Client reference to dirnode/filenode constructors
- pass less-powerful references instead, like StorageBroker or Uploader
* always create DirectoryNodes by wrapping a filenode (mutable for now)
* remove some specialized mock classes from unit tests
Detailed list of changes (done one at a time, then merged together)
always pass a string to create_node_from_uri(), not an IURI instance
always pass a string to IFilesystemNode constructors, not an IURI instance
stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri()
client.py: move SecretHolder code out to a separate class
test_web.py: hush pyflakes
client.py: move NodeMaker functionality out into a separate object
LiteralFileNode: stop storing a Client reference
immutable Checker: remove Client reference, it only needs a SecretHolder
immutable Upload: remove Client reference, leave SecretHolder and StorageBroker
immutable Repairer: replace Client reference with StorageBroker and SecretHolder
immutable FileNode: remove Client reference
mutable.Publish: stop passing Client
mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference
MutableChecker: reference StorageBroker and History directly, not through Client
mutable.FileNode: removed unused indirection to checker classes
mutable.FileNode: remove Client reference
client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker
move create_mutable_file() into NodeMaker
test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests.
test_mutable.py: clean up basedir names
client.py: move create_empty_dirnode() into NodeMaker
dirnode.py: get rid of DirectoryNode.create
remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match
stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker
remove Client from NodeMaker
move helper status into History, pass History to web.Status instead of Client
test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
|
|
|
def __init__(self):
|
2008-02-06 07:41:51 +00:00
|
|
|
self._results = UploadResults()
|
2008-02-12 22:36:05 +00:00
|
|
|
self._status = s = UploadStatus()
|
|
|
|
s.set_storage_index(None)
|
|
|
|
s.set_helper(False)
|
|
|
|
s.set_progress(0, 1.0)
|
2008-02-26 22:35:28 +00:00
|
|
|
s.set_active(False)
|
2008-03-03 21:48:52 +00:00
|
|
|
s.set_results(self._results)
|
2007-07-12 20:22:36 +00:00
|
|
|
|
2007-07-24 02:31:53 +00:00
|
|
|
def start(self, uploadable):
|
|
|
|
uploadable = IUploadable(uploadable)
|
|
|
|
d = uploadable.get_size()
|
2008-02-06 08:52:25 +00:00
|
|
|
def _got_size(size):
|
2008-02-12 22:36:05 +00:00
|
|
|
self._size = size
|
|
|
|
self._status.set_size(size)
|
2008-02-06 08:52:25 +00:00
|
|
|
self._results.file_size = size
|
|
|
|
return read_this_many_bytes(uploadable, size)
|
|
|
|
d.addCallback(_got_size)
|
2007-07-21 22:40:36 +00:00
|
|
|
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
|
|
|
|
d.addCallback(lambda u: u.to_string())
|
2008-02-06 04:01:38 +00:00
|
|
|
d.addCallback(self._build_results)
|
2007-07-20 01:21:44 +00:00
|
|
|
return d
|
2007-07-12 20:22:36 +00:00
|
|
|
|
2008-02-06 04:01:38 +00:00
|
|
|
def _build_results(self, uri):
|
2008-02-06 07:41:51 +00:00
|
|
|
self._results.uri = uri
|
2009-11-21 06:15:43 +00:00
|
|
|
self._status.set_status("Finished")
|
2008-02-12 22:36:05 +00:00
|
|
|
self._status.set_progress(1, 1.0)
|
|
|
|
self._status.set_progress(2, 1.0)
|
2008-02-06 07:41:51 +00:00
|
|
|
return self._results
|
2008-02-06 04:01:38 +00:00
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
def close(self):
|
|
|
|
pass
|
2007-01-16 04:22:22 +00:00
|
|
|
|
2008-02-12 22:36:05 +00:00
|
|
|
def get_upload_status(self):
|
|
|
|
return self._status
|
|
|
|
|
2008-01-16 10:03:35 +00:00
|
|
|
class RemoteEncryptedUploadable(Referenceable):
|
2008-01-10 00:58:47 +00:00
|
|
|
implements(RIEncryptedUploadable)
|
|
|
|
|
2008-02-12 22:36:05 +00:00
|
|
|
def __init__(self, encrypted_uploadable, upload_status):
|
2008-01-10 00:58:47 +00:00
|
|
|
self._eu = IEncryptedUploadable(encrypted_uploadable)
|
|
|
|
self._offset = 0
|
2008-01-17 08:16:56 +00:00
|
|
|
self._bytes_sent = 0
|
2008-02-12 22:36:05 +00:00
|
|
|
self._status = IUploadStatus(upload_status)
|
|
|
|
# we are responsible for updating the status string while we run, and
|
|
|
|
# for setting the ciphertext-fetch progress.
|
|
|
|
self._size = None
|
|
|
|
|
|
|
|
def get_size(self):
|
|
|
|
if self._size is not None:
|
|
|
|
return defer.succeed(self._size)
|
|
|
|
d = self._eu.get_size()
|
|
|
|
def _got_size(size):
|
|
|
|
self._size = size
|
|
|
|
return size
|
|
|
|
d.addCallback(_got_size)
|
|
|
|
return d
|
2008-01-10 00:58:47 +00:00
|
|
|
|
|
|
|
def remote_get_size(self):
|
2008-02-12 22:36:05 +00:00
|
|
|
return self.get_size()
|
2008-01-16 10:03:35 +00:00
|
|
|
def remote_get_all_encoding_parameters(self):
|
|
|
|
return self._eu.get_all_encoding_parameters()
|
|
|
|
|
2008-01-25 00:25:33 +00:00
|
|
|
def _read_encrypted(self, length, hash_only):
|
|
|
|
d = self._eu.read_encrypted(length, hash_only)
|
|
|
|
def _read(strings):
|
|
|
|
if hash_only:
|
|
|
|
self._offset += length
|
|
|
|
else:
|
|
|
|
size = sum([len(data) for data in strings])
|
|
|
|
self._offset += size
|
|
|
|
return strings
|
|
|
|
d.addCallback(_read)
|
|
|
|
return d
|
|
|
|
|
2008-01-10 00:58:47 +00:00
|
|
|
def remote_read_encrypted(self, offset, length):
|
2008-01-17 08:16:56 +00:00
|
|
|
# we don't support seek backwards, but we allow skipping forwards
|
|
|
|
precondition(offset >= 0, offset)
|
|
|
|
precondition(length >= 0, length)
|
|
|
|
lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
|
|
|
|
level=log.NOISY)
|
|
|
|
precondition(offset >= self._offset, offset, self._offset)
|
|
|
|
if offset > self._offset:
|
|
|
|
# read the data from disk anyways, to build up the hash tree
|
|
|
|
skip = offset - self._offset
|
2008-01-29 02:13:36 +00:00
|
|
|
log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
|
|
|
|
(self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
|
2008-01-25 00:25:33 +00:00
|
|
|
d = self._read_encrypted(skip, hash_only=True)
|
|
|
|
else:
|
|
|
|
d = defer.succeed(None)
|
|
|
|
|
|
|
|
def _at_correct_offset(res):
|
|
|
|
assert offset == self._offset, "%d != %d" % (offset, self._offset)
|
|
|
|
return self._read_encrypted(length, hash_only=False)
|
|
|
|
d.addCallback(_at_correct_offset)
|
2008-01-17 08:16:56 +00:00
|
|
|
|
2008-01-11 12:42:55 +00:00
|
|
|
def _read(strings):
|
2008-01-15 04:22:55 +00:00
|
|
|
size = sum([len(data) for data in strings])
|
2008-01-17 08:16:56 +00:00
|
|
|
self._bytes_sent += size
|
2008-01-11 12:42:55 +00:00
|
|
|
return strings
|
2008-01-10 00:58:47 +00:00
|
|
|
d.addCallback(_read)
|
|
|
|
return d
|
2008-01-25 00:25:33 +00:00
|
|
|
|
2008-01-17 08:52:33 +00:00
|
|
|
def remote_close(self):
|
|
|
|
return self._eu.close()
|
2008-01-10 00:58:47 +00:00
|
|
|
|
|
|
|
|
|
|
|
class AssistedUploader:
|
2008-01-09 04:18:54 +00:00
|
|
|
|
2008-02-07 01:39:03 +00:00
|
|
|
def __init__(self, helper):
|
2008-01-09 04:18:54 +00:00
|
|
|
self._helper = helper
|
2008-01-10 03:25:50 +00:00
|
|
|
self._log_number = log.msg("AssistedUploader starting")
|
2008-02-12 22:36:05 +00:00
|
|
|
self._storage_index = None
|
|
|
|
self._upload_status = s = UploadStatus()
|
|
|
|
s.set_helper(True)
|
2008-02-26 22:35:28 +00:00
|
|
|
s.set_active(True)
|
2008-01-10 03:25:50 +00:00
|
|
|
|
2008-03-01 03:00:45 +00:00
|
|
|
def log(self, *args, **kwargs):
|
|
|
|
if "parent" not in kwargs:
|
|
|
|
kwargs["parent"] = self._log_number
|
|
|
|
return log.msg(*args, **kwargs)
|
2008-01-09 04:18:54 +00:00
|
|
|
|
2009-01-07 04:48:22 +00:00
|
|
|
def start(self, encrypted_uploadable, storage_index):
|
|
|
|
"""Start uploading the file.
|
|
|
|
|
|
|
|
Returns a Deferred that will fire with the UploadResults instance.
|
|
|
|
"""
|
|
|
|
precondition(isinstance(storage_index, str), storage_index)
|
2008-02-06 07:41:51 +00:00
|
|
|
self._started = time.time()
|
2009-01-07 04:48:22 +00:00
|
|
|
eu = IEncryptedUploadable(encrypted_uploadable)
|
2008-02-12 22:36:05 +00:00
|
|
|
eu.set_upload_status(self._upload_status)
|
2008-01-09 04:18:54 +00:00
|
|
|
self._encuploadable = eu
|
2009-01-07 04:48:22 +00:00
|
|
|
self._storage_index = storage_index
|
2008-01-09 04:18:54 +00:00
|
|
|
d = eu.get_size()
|
|
|
|
d.addCallback(self._got_size)
|
2008-01-16 10:03:35 +00:00
|
|
|
d.addCallback(lambda res: eu.get_all_encoding_parameters())
|
|
|
|
d.addCallback(self._got_all_encoding_parameters)
|
2008-01-09 04:18:54 +00:00
|
|
|
d.addCallback(self._contact_helper)
|
2009-01-07 04:48:22 +00:00
|
|
|
d.addCallback(self._build_verifycap)
|
2008-02-26 22:35:28 +00:00
|
|
|
def _done(res):
|
|
|
|
self._upload_status.set_active(False)
|
|
|
|
return res
|
|
|
|
d.addBoth(_done)
|
2008-01-09 04:18:54 +00:00
|
|
|
return d
|
|
|
|
|
|
|
|
def _got_size(self, size):
|
|
|
|
self._size = size
|
2008-02-12 22:36:05 +00:00
|
|
|
self._upload_status.set_size(size)
|
2008-01-09 04:18:54 +00:00
|
|
|
|
2008-01-16 10:03:35 +00:00
|
|
|
def _got_all_encoding_parameters(self, params):
|
|
|
|
k, happy, n, segment_size = params
|
|
|
|
# stash these for URI generation later
|
|
|
|
self._needed_shares = k
|
|
|
|
self._total_shares = n
|
2008-02-07 00:30:58 +00:00
|
|
|
self._segment_size = segment_size
|
2008-01-16 10:03:35 +00:00
|
|
|
|
2008-01-09 04:18:54 +00:00
|
|
|
def _contact_helper(self, res):
|
2008-02-06 08:52:25 +00:00
|
|
|
now = self._time_contacting_helper_start = time.time()
|
|
|
|
self._storage_index_elapsed = now - self._started
|
2008-03-01 03:00:45 +00:00
|
|
|
self.log(format="contacting helper for SI %(si)s..",
|
2009-02-18 21:46:55 +00:00
|
|
|
si=si_b2a(self._storage_index))
|
2008-02-12 22:36:05 +00:00
|
|
|
self._upload_status.set_status("Contacting Helper")
|
2008-01-11 11:53:37 +00:00
|
|
|
d = self._helper.callRemote("upload_chk", self._storage_index)
|
2008-01-09 04:18:54 +00:00
|
|
|
d.addCallback(self._contacted_helper)
|
|
|
|
return d
|
2008-02-07 01:39:03 +00:00
|
|
|
|
2008-01-10 03:25:50 +00:00
|
|
|
def _contacted_helper(self, (upload_results, upload_helper)):
|
2008-02-06 07:41:51 +00:00
|
|
|
now = time.time()
|
2008-02-06 08:52:25 +00:00
|
|
|
elapsed = now - self._time_contacting_helper_start
|
|
|
|
self._elapsed_time_contacting_helper = elapsed
|
2008-01-09 04:18:54 +00:00
|
|
|
if upload_helper:
|
2008-01-11 11:53:37 +00:00
|
|
|
self.log("helper says we need to upload")
|
2008-02-12 22:36:05 +00:00
|
|
|
self._upload_status.set_status("Uploading Ciphertext")
|
2008-01-09 04:18:54 +00:00
|
|
|
# we need to upload the file
|
2008-02-12 22:36:05 +00:00
|
|
|
reu = RemoteEncryptedUploadable(self._encuploadable,
|
|
|
|
self._upload_status)
|
|
|
|
# let it pre-compute the size for progress purposes
|
|
|
|
d = reu.get_size()
|
|
|
|
d.addCallback(lambda ignored:
|
|
|
|
upload_helper.callRemote("upload", reu))
|
2008-01-09 04:18:54 +00:00
|
|
|
# this Deferred will fire with the upload results
|
|
|
|
return d
|
2008-01-11 11:53:37 +00:00
|
|
|
self.log("helper says file is already uploaded")
|
2008-02-12 22:36:05 +00:00
|
|
|
self._upload_status.set_progress(1, 1.0)
|
2008-03-03 21:48:52 +00:00
|
|
|
self._upload_status.set_results(upload_results)
|
2008-01-09 04:18:54 +00:00
|
|
|
return upload_results
|
|
|
|
|
2009-02-09 21:45:43 +00:00
|
|
|
def _convert_old_upload_results(self, upload_results):
|
|
|
|
# pre-1.3.0 helpers return upload results which contain a mapping
|
|
|
|
# from shnum to a single human-readable string, containing things
|
|
|
|
# like "Found on [x],[y],[z]" (for healthy files that were already in
|
|
|
|
# the grid), "Found on [x]" (for files that needed upload but which
|
|
|
|
# discovered pre-existing shares), and "Placed on [x]" (for newly
|
|
|
|
# uploaded shares). The 1.3.0 helper returns a mapping from shnum to
|
|
|
|
# set of binary serverid strings.
|
|
|
|
|
|
|
|
# the old results are too hard to deal with (they don't even contain
|
|
|
|
# as much information as the new results, since the nodeids are
|
|
|
|
# abbreviated), so if we detect old results, just clobber them.
|
|
|
|
|
|
|
|
sharemap = upload_results.sharemap
|
|
|
|
if str in [type(v) for v in sharemap.values()]:
|
|
|
|
upload_results.sharemap = None
|
|
|
|
|
2009-01-07 04:48:22 +00:00
|
|
|
def _build_verifycap(self, upload_results):
|
2008-01-30 00:38:12 +00:00
|
|
|
self.log("upload finished, building readcap")
|
2009-02-09 21:45:43 +00:00
|
|
|
self._convert_old_upload_results(upload_results)
|
2008-02-12 22:36:05 +00:00
|
|
|
self._upload_status.set_status("Building Readcap")
|
2008-02-06 08:52:25 +00:00
|
|
|
r = upload_results
|
2008-02-07 00:30:58 +00:00
|
|
|
assert r.uri_extension_data["needed_shares"] == self._needed_shares
|
|
|
|
assert r.uri_extension_data["total_shares"] == self._total_shares
|
|
|
|
assert r.uri_extension_data["segment_size"] == self._segment_size
|
|
|
|
assert r.uri_extension_data["size"] == self._size
|
2009-01-07 04:48:22 +00:00
|
|
|
r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
|
|
|
|
uri_extension_hash=r.uri_extension_hash,
|
|
|
|
needed_shares=self._needed_shares,
|
|
|
|
total_shares=self._total_shares, size=self._size
|
|
|
|
).to_string()
|
2008-02-06 07:41:51 +00:00
|
|
|
now = time.time()
|
2008-02-06 08:52:25 +00:00
|
|
|
r.file_size = self._size
|
|
|
|
r.timings["storage_index"] = self._storage_index_elapsed
|
|
|
|
r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
|
|
|
|
if "total" in r.timings:
|
|
|
|
r.timings["helper_total"] = r.timings["total"]
|
2008-02-06 07:41:51 +00:00
|
|
|
r.timings["total"] = now - self._started
|
2009-11-21 06:15:43 +00:00
|
|
|
self._upload_status.set_status("Finished")
|
2008-03-03 21:48:52 +00:00
|
|
|
self._upload_status.set_results(r)
|
2008-02-06 07:41:51 +00:00
|
|
|
return r
|
2008-01-09 04:18:54 +00:00
|
|
|
|
2008-02-12 22:36:05 +00:00
|
|
|
def get_upload_status(self):
|
|
|
|
return self._upload_status
|
|
|
|
|
2008-02-07 01:39:03 +00:00
|
|
|
class BaseUploadable:
|
2008-03-08 02:24:51 +00:00
|
|
|
default_max_segment_size = 128*KiB # overridden by max_segment_size
|
2008-02-07 01:39:03 +00:00
|
|
|
default_encoding_param_k = 3 # overridden by encoding_parameters
|
|
|
|
default_encoding_param_happy = 7
|
|
|
|
default_encoding_param_n = 10
|
|
|
|
|
2008-01-17 08:17:42 +00:00
|
|
|
max_segment_size = None
|
2008-02-07 01:39:03 +00:00
|
|
|
encoding_param_k = None
|
|
|
|
encoding_param_happy = None
|
|
|
|
encoding_param_n = None
|
|
|
|
|
|
|
|
_all_encoding_parameters = None
|
2008-02-12 22:36:05 +00:00
|
|
|
_status = None
|
|
|
|
|
|
|
|
def set_upload_status(self, upload_status):
|
|
|
|
self._status = IUploadStatus(upload_status)
|
2008-02-07 01:39:03 +00:00
|
|
|
|
|
|
|
def set_default_encoding_parameters(self, default_params):
|
|
|
|
assert isinstance(default_params, dict)
|
|
|
|
for k,v in default_params.items():
|
|
|
|
precondition(isinstance(k, str), k, v)
|
|
|
|
precondition(isinstance(v, int), k, v)
|
|
|
|
if "k" in default_params:
|
|
|
|
self.default_encoding_param_k = default_params["k"]
|
|
|
|
if "happy" in default_params:
|
|
|
|
self.default_encoding_param_happy = default_params["happy"]
|
|
|
|
if "n" in default_params:
|
|
|
|
self.default_encoding_param_n = default_params["n"]
|
|
|
|
if "max_segment_size" in default_params:
|
|
|
|
self.default_max_segment_size = default_params["max_segment_size"]
|
|
|
|
|
|
|
|
def get_all_encoding_parameters(self):
|
|
|
|
if self._all_encoding_parameters:
|
|
|
|
return defer.succeed(self._all_encoding_parameters)
|
|
|
|
|
|
|
|
max_segsize = self.max_segment_size or self.default_max_segment_size
|
|
|
|
k = self.encoding_param_k or self.default_encoding_param_k
|
|
|
|
happy = self.encoding_param_happy or self.default_encoding_param_happy
|
|
|
|
n = self.encoding_param_n or self.default_encoding_param_n
|
|
|
|
|
|
|
|
d = self.get_size()
|
|
|
|
def _got_size(file_size):
|
|
|
|
# for small files, shrink the segment size to avoid wasting space
|
|
|
|
segsize = min(max_segsize, file_size)
|
|
|
|
# this must be a multiple of 'required_shares'==k
|
|
|
|
segsize = mathutil.next_multiple(segsize, k)
|
|
|
|
encoding_parameters = (k, happy, n, segsize)
|
|
|
|
self._all_encoding_parameters = encoding_parameters
|
|
|
|
return encoding_parameters
|
|
|
|
d.addCallback(_got_size)
|
|
|
|
return d
|
2006-12-04 02:07:41 +00:00
|
|
|
|
2008-02-07 01:39:03 +00:00
|
|
|
class FileHandle(BaseUploadable):
|
2008-01-30 19:24:50 +00:00
|
|
|
implements(IUploadable)
|
2007-07-24 02:31:53 +00:00
|
|
|
|
2008-03-24 16:46:06 +00:00
|
|
|
def __init__(self, filehandle, convergence):
|
|
|
|
"""
|
|
|
|
Upload the data from the filehandle. If convergence is None then a
|
|
|
|
random encryption key will be used, else the plaintext will be hashed,
|
|
|
|
then the hash will be hashed together with the string in the
|
2008-03-24 22:51:19 +00:00
|
|
|
"convergence" argument to form the encryption key.
|
2008-03-24 16:46:06 +00:00
|
|
|
"""
|
|
|
|
assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
|
2008-01-30 19:24:50 +00:00
|
|
|
self._filehandle = filehandle
|
|
|
|
self._key = None
|
2008-03-24 16:46:06 +00:00
|
|
|
self.convergence = convergence
|
2008-02-12 22:36:05 +00:00
|
|
|
self._size = None
|
2008-01-30 19:24:50 +00:00
|
|
|
|
2008-03-24 16:46:06 +00:00
|
|
|
def _get_encryption_key_convergent(self):
|
2008-02-07 02:50:47 +00:00
|
|
|
if self._key is not None:
|
|
|
|
return defer.succeed(self._key)
|
|
|
|
|
2008-02-12 22:36:05 +00:00
|
|
|
d = self.get_size()
|
|
|
|
# that sets self._size as a side-effect
|
|
|
|
d.addCallback(lambda size: self.get_all_encoding_parameters())
|
2008-02-07 02:50:47 +00:00
|
|
|
def _got(params):
|
|
|
|
k, happy, n, segsize = params
|
2007-07-24 02:31:53 +00:00
|
|
|
f = self._filehandle
|
2008-03-24 16:46:06 +00:00
|
|
|
enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
|
2007-07-24 02:31:53 +00:00
|
|
|
f.seek(0)
|
|
|
|
BLOCKSIZE = 64*1024
|
2008-02-12 22:36:05 +00:00
|
|
|
bytes_read = 0
|
2007-07-24 02:31:53 +00:00
|
|
|
while True:
|
|
|
|
data = f.read(BLOCKSIZE)
|
|
|
|
if not data:
|
|
|
|
break
|
|
|
|
enckey_hasher.update(data)
|
2008-02-12 22:36:05 +00:00
|
|
|
# TODO: setting progress in a non-yielding loop is kind of
|
|
|
|
# pointless, but I'm anticipating (perhaps prematurely) the
|
|
|
|
# day when we use a slowjob or twisted's CooperatorService to
|
|
|
|
# make this yield time to other jobs.
|
|
|
|
bytes_read += len(data)
|
|
|
|
if self._status:
|
|
|
|
self._status.set_progress(0, float(bytes_read)/self._size)
|
2007-07-24 02:31:53 +00:00
|
|
|
f.seek(0)
|
2008-02-07 02:50:47 +00:00
|
|
|
self._key = enckey_hasher.digest()
|
2008-02-12 22:36:05 +00:00
|
|
|
if self._status:
|
|
|
|
self._status.set_progress(0, 1.0)
|
2008-02-07 02:50:47 +00:00
|
|
|
assert len(self._key) == 16
|
|
|
|
return self._key
|
|
|
|
d.addCallback(_got)
|
|
|
|
return d
|
2007-07-20 01:21:44 +00:00
|
|
|
|
2008-01-30 19:24:50 +00:00
|
|
|
def _get_encryption_key_random(self):
|
2007-07-24 02:31:53 +00:00
|
|
|
if self._key is None:
|
|
|
|
self._key = os.urandom(16)
|
|
|
|
return defer.succeed(self._key)
|
2007-07-20 01:21:44 +00:00
|
|
|
|
2008-01-30 19:24:50 +00:00
|
|
|
def get_encryption_key(self):
|
2008-03-24 16:46:06 +00:00
|
|
|
if self.convergence is not None:
|
|
|
|
return self._get_encryption_key_convergent()
|
2008-01-30 19:24:50 +00:00
|
|
|
else:
|
|
|
|
return self._get_encryption_key_random()
|
2007-07-20 01:21:44 +00:00
|
|
|
|
|
|
|
def get_size(self):
|
2008-02-12 22:36:05 +00:00
|
|
|
if self._size is not None:
|
|
|
|
return defer.succeed(self._size)
|
2007-07-20 01:21:44 +00:00
|
|
|
self._filehandle.seek(0,2)
|
|
|
|
size = self._filehandle.tell()
|
2008-02-12 22:36:05 +00:00
|
|
|
self._size = size
|
2007-07-20 01:21:44 +00:00
|
|
|
self._filehandle.seek(0)
|
|
|
|
return defer.succeed(size)
|
|
|
|
|
|
|
|
def read(self, length):
|
|
|
|
return defer.succeed([self._filehandle.read(length)])
|
|
|
|
|
|
|
|
def close(self):
|
2006-12-04 02:07:41 +00:00
|
|
|
# the originator of the filehandle reserves the right to close it
|
|
|
|
pass
|
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
class FileName(FileHandle):
|
2008-03-24 16:46:06 +00:00
|
|
|
def __init__(self, filename, convergence):
|
|
|
|
"""
|
|
|
|
Upload the data from the filename. If convergence is None then a
|
|
|
|
random encryption key will be used, else the plaintext will be hashed,
|
|
|
|
then the hash will be hashed together with the string in the
|
2008-03-24 22:51:19 +00:00
|
|
|
"convergence" argument to form the encryption key.
|
2008-03-24 16:46:06 +00:00
|
|
|
"""
|
|
|
|
assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
|
|
|
|
FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
|
2007-07-20 01:21:44 +00:00
|
|
|
def close(self):
|
|
|
|
FileHandle.close(self)
|
|
|
|
self._filehandle.close()
|
|
|
|
|
|
|
|
class Data(FileHandle):
|
2008-03-24 16:46:06 +00:00
|
|
|
def __init__(self, data, convergence):
|
|
|
|
"""
|
|
|
|
Upload the data from the data argument. If convergence is None then a
|
|
|
|
random encryption key will be used, else the plaintext will be hashed,
|
|
|
|
then the hash will be hashed together with the string in the
|
2008-03-24 22:51:19 +00:00
|
|
|
"convergence" argument to form the encryption key.
|
2008-03-24 16:46:06 +00:00
|
|
|
"""
|
|
|
|
assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
|
|
|
|
FileHandle.__init__(self, StringIO(data), convergence=convergence)
|
2007-07-20 01:21:44 +00:00
|
|
|
|
2009-01-07 04:48:22 +00:00
|
|
|
class Uploader(service.MultiService, log.PrefixingLogMixin):
|
2008-03-01 03:00:45 +00:00
|
|
|
"""I am a service that allows file uploading. I am a service-child of the
|
|
|
|
Client.
|
2006-12-03 01:27:18 +00:00
|
|
|
"""
|
2007-01-21 22:01:34 +00:00
|
|
|
implements(IUploader)
|
2006-12-03 01:27:18 +00:00
|
|
|
name = "uploader"
|
2007-07-12 20:22:36 +00:00
|
|
|
URI_LIT_SIZE_THRESHOLD = 55
|
2006-12-03 01:27:18 +00:00
|
|
|
|
2008-04-10 01:08:59 +00:00
|
|
|
def __init__(self, helper_furl=None, stats_provider=None):
|
2008-01-09 04:18:54 +00:00
|
|
|
self._helper_furl = helper_furl
|
2008-04-10 01:08:59 +00:00
|
|
|
self.stats_provider = stats_provider
|
2008-01-09 04:18:54 +00:00
|
|
|
self._helper = None
|
2008-04-17 20:02:22 +00:00
|
|
|
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
|
2009-01-07 04:48:22 +00:00
|
|
|
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
|
2008-01-10 00:58:47 +00:00
|
|
|
service.MultiService.__init__(self)
|
2008-01-09 04:18:54 +00:00
|
|
|
|
|
|
|
def startService(self):
|
|
|
|
service.MultiService.startService(self)
|
|
|
|
if self._helper_furl:
|
|
|
|
self.parent.tub.connectTo(self._helper_furl,
|
|
|
|
self._got_helper)
|
|
|
|
|
|
|
|
def _got_helper(self, helper):
|
2009-01-07 04:48:22 +00:00
|
|
|
self.log("got helper connection, getting versions")
|
2008-11-22 03:07:27 +00:00
|
|
|
default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
|
|
|
|
{ },
|
|
|
|
"application-version": "unknown: no get_version()",
|
|
|
|
}
|
2009-05-22 00:46:32 +00:00
|
|
|
d = add_version_to_remote_reference(helper, default)
|
2008-11-22 03:07:27 +00:00
|
|
|
d.addCallback(self._got_versioned_helper)
|
|
|
|
|
|
|
|
def _got_versioned_helper(self, helper):
|
2008-11-22 03:29:32 +00:00
|
|
|
needed = "http://allmydata.org/tahoe/protocols/helper/v1"
|
|
|
|
if needed not in helper.version:
|
|
|
|
raise InsufficientVersionError(needed, helper.version)
|
2008-01-09 04:18:54 +00:00
|
|
|
self._helper = helper
|
2008-02-08 00:26:59 +00:00
|
|
|
helper.notifyOnDisconnect(self._lost_helper)
|
2008-11-22 03:07:27 +00:00
|
|
|
|
2008-02-08 00:26:59 +00:00
|
|
|
def _lost_helper(self):
|
|
|
|
self._helper = None
|
2008-01-09 04:18:54 +00:00
|
|
|
|
2008-01-28 20:56:22 +00:00
|
|
|
def get_helper_info(self):
|
|
|
|
# return a tuple of (helper_furl_or_None, connected_bool)
|
|
|
|
return (self._helper_furl, bool(self._helper))
|
|
|
|
|
2008-11-22 03:07:27 +00:00
|
|
|
|
2009-01-14 23:41:06 +00:00
|
|
|
def upload(self, uploadable, history=None):
|
2009-01-07 04:48:22 +00:00
|
|
|
"""
|
|
|
|
Returns a Deferred that will fire with the UploadResults instance.
|
|
|
|
"""
|
2006-12-03 01:27:18 +00:00
|
|
|
assert self.parent
|
|
|
|
assert self.running
|
2007-08-10 01:30:24 +00:00
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
uploadable = IUploadable(uploadable)
|
|
|
|
d = uploadable.get_size()
|
|
|
|
def _got_size(size):
|
2008-01-16 10:03:35 +00:00
|
|
|
default_params = self.parent.get_encoding_parameters()
|
|
|
|
precondition(isinstance(default_params, dict), default_params)
|
|
|
|
precondition("max_segment_size" in default_params, default_params)
|
2008-02-07 01:39:03 +00:00
|
|
|
uploadable.set_default_encoding_parameters(default_params)
|
2008-04-10 01:08:59 +00:00
|
|
|
|
|
|
|
if self.stats_provider:
|
|
|
|
self.stats_provider.count('uploader.files_uploaded', 1)
|
|
|
|
self.stats_provider.count('uploader.bytes_uploaded', size)
|
|
|
|
|
2007-07-20 01:21:44 +00:00
|
|
|
if size <= self.URI_LIT_SIZE_THRESHOLD:
|
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
* stop using IURI as an adapter
* pass cap strings around instead of URI instances
* move filenode/dirnode creation duties from Client to new NodeMaker class
* move other Client duties to KeyGenerator, SecretHolder, History classes
* stop passing Client reference to dirnode/filenode constructors
- pass less-powerful references instead, like StorageBroker or Uploader
* always create DirectoryNodes by wrapping a filenode (mutable for now)
* remove some specialized mock classes from unit tests
Detailed list of changes (done one at a time, then merged together)
always pass a string to create_node_from_uri(), not an IURI instance
always pass a string to IFilesystemNode constructors, not an IURI instance
stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri()
client.py: move SecretHolder code out to a separate class
test_web.py: hush pyflakes
client.py: move NodeMaker functionality out into a separate object
LiteralFileNode: stop storing a Client reference
immutable Checker: remove Client reference, it only needs a SecretHolder
immutable Upload: remove Client reference, leave SecretHolder and StorageBroker
immutable Repairer: replace Client reference with StorageBroker and SecretHolder
immutable FileNode: remove Client reference
mutable.Publish: stop passing Client
mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference
MutableChecker: reference StorageBroker and History directly, not through Client
mutable.FileNode: removed unused indirection to checker classes
mutable.FileNode: remove Client reference
client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker
move create_mutable_file() into NodeMaker
test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests.
test_mutable.py: clean up basedir names
client.py: move create_empty_dirnode() into NodeMaker
dirnode.py: get rid of DirectoryNode.create
remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match
stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker
remove Client from NodeMaker
move helper status into History, pass History to web.Status instead of Client
test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
|
|
|
uploader = LiteralUploader()
|
2009-01-07 04:48:22 +00:00
|
|
|
return uploader.start(uploadable)
|
2008-01-09 04:18:54 +00:00
|
|
|
else:
|
2009-01-07 04:48:22 +00:00
|
|
|
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
|
|
|
|
d2 = defer.succeed(None)
|
|
|
|
if self._helper:
|
|
|
|
uploader = AssistedUploader(self._helper)
|
|
|
|
d2.addCallback(lambda x: eu.get_storage_index())
|
|
|
|
d2.addCallback(lambda si: uploader.start(eu, si))
|
|
|
|
else:
|
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
* stop using IURI as an adapter
* pass cap strings around instead of URI instances
* move filenode/dirnode creation duties from Client to new NodeMaker class
* move other Client duties to KeyGenerator, SecretHolder, History classes
* stop passing Client reference to dirnode/filenode constructors
- pass less-powerful references instead, like StorageBroker or Uploader
* always create DirectoryNodes by wrapping a filenode (mutable for now)
* remove some specialized mock classes from unit tests
Detailed list of changes (done one at a time, then merged together)
always pass a string to create_node_from_uri(), not an IURI instance
always pass a string to IFilesystemNode constructors, not an IURI instance
stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri()
client.py: move SecretHolder code out to a separate class
test_web.py: hush pyflakes
client.py: move NodeMaker functionality out into a separate object
LiteralFileNode: stop storing a Client reference
immutable Checker: remove Client reference, it only needs a SecretHolder
immutable Upload: remove Client reference, leave SecretHolder and StorageBroker
immutable Repairer: replace Client reference with StorageBroker and SecretHolder
immutable FileNode: remove Client reference
mutable.Publish: stop passing Client
mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference
MutableChecker: reference StorageBroker and History directly, not through Client
mutable.FileNode: removed unused indirection to checker classes
mutable.FileNode: remove Client reference
client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker
move create_mutable_file() into NodeMaker
test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests.
test_mutable.py: clean up basedir names
client.py: move create_empty_dirnode() into NodeMaker
dirnode.py: get rid of DirectoryNode.create
remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match
stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker
remove Client from NodeMaker
move helper status into History, pass History to web.Status instead of Client
test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
|
|
|
storage_broker = self.parent.get_storage_broker()
|
|
|
|
secret_holder = self.parent._secret_holder
|
|
|
|
uploader = CHKUploader(storage_broker, secret_holder)
|
2009-01-07 04:48:22 +00:00
|
|
|
d2.addCallback(lambda x: uploader.start(eu))
|
|
|
|
|
2009-01-14 23:41:06 +00:00
|
|
|
self._all_uploads[uploader] = None
|
|
|
|
if history:
|
|
|
|
history.add_upload(uploader.get_upload_status())
|
2009-01-07 04:48:22 +00:00
|
|
|
def turn_verifycap_into_read_cap(uploadresults):
|
|
|
|
# Generate the uri from the verifycap plus the key.
|
|
|
|
d3 = uploadable.get_encryption_key()
|
|
|
|
def put_readcap_into_results(key):
|
|
|
|
v = uri.from_string(uploadresults.verifycapstr)
|
|
|
|
r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
|
|
|
|
uploadresults.uri = r.to_string()
|
|
|
|
return uploadresults
|
|
|
|
d3.addCallback(put_readcap_into_results)
|
|
|
|
return d3
|
|
|
|
d2.addCallback(turn_verifycap_into_read_cap)
|
|
|
|
return d2
|
2007-07-20 01:21:44 +00:00
|
|
|
d.addCallback(_got_size)
|
2006-12-04 02:07:41 +00:00
|
|
|
def _done(res):
|
2007-07-20 01:21:44 +00:00
|
|
|
uploadable.close()
|
2006-12-04 02:07:41 +00:00
|
|
|
return res
|
|
|
|
d.addBoth(_done)
|
2006-12-03 01:27:18 +00:00
|
|
|
return d
|