tahoe-lafs/src/allmydata/immutable/upload.py

1899 lines
76 KiB
Python
Raw Normal View History

import os, time, weakref, itertools
from zope.interface import implementer
from twisted.python import failure
2006-12-01 09:54:28 +00:00
from twisted.internet import defer
from twisted.application import service
from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
2019-06-17 21:54:46 +00:00
from allmydata.crypto import aes
from allmydata.util.hashutil import file_renewal_secret_hash, \
file_cancel_secret_hash, bucket_renewal_secret_hash, \
bucket_cancel_secret_hash, plaintext_hasher, \
storage_index_hash, plaintext_segment_hasher, convergence_hasher
from allmydata.util.deferredutil import timeout_call
from allmydata import hashtree, uri
from allmydata.storage.server import si_b2a
from allmydata.immutable import encode
from allmydata.util import base32, dictutil, idlib, log, mathutil
Fix up the behavior of #778, per reviewers' comments - Make some important utility functions clearer and more thoroughly documented. - Assert in upload.servers_of_happiness that the buckets attributes of PeerTrackers passed to it are mutually disjoint. - Get rid of some silly non-Pythonisms that I didn't see when I first wrote these patches. - Make sure that should_add_server returns true when queried about a shnum that it doesn't know about yet. - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set of peerids, alter dependencies to deal with that. - Remove upload.should_add_servers, because it is no longer necessary - Move upload.shares_of_happiness and upload.shares_by_server to a utility file. - Change some points in Tahoe2PeerSelector. - Compute servers_of_happiness using a bipartite matching algorithm that we know is optimal instead of an ad-hoc greedy algorithm that isn't. - Change servers_of_happiness to just take a sharemap as an argument, change its callers to merge existing_shares and used_peers before calling it. - Change an error message in the encoder to be more appropriate for servers of happiness. - Clarify the wording of an error message in immutable/upload.py - Refactor a happiness failure message to happinessutil.py, and make immutable/upload.py and immutable/encode.py use it. - Move the word "only" as far to the right as possible in failure messages. - Use a better definition of progress during peer selection. - Do read-only peer share detection queries in parallel, not sequentially. - Clean up logging semantics; print the query statistics whenever an upload is unsuccessful, not just in one case.
2010-05-14 00:49:17 +00:00
from allmydata.util.happinessutil import servers_of_happiness, \
merge_servers, failure_message
from allmydata.util.assertutil import precondition, _assert
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
NoServersError, InsufficientVersionError, UploadUnhappinessError, \
DEFAULT_MAX_SEGMENT_SIZE, IProgress, IPeerSelector
from allmydata.immutable import layout
2006-12-01 09:54:28 +00:00
from six.moves import cStringIO as StringIO
from happiness_upload import share_placement, calculate_happiness
2006-12-03 03:31:43 +00:00
from ..util.eliotutil import (
log_call_deferred,
inline_callbacks,
)
from eliot import (
ActionType,
MessageType,
Field,
)
_TOTAL_SHARES = Field.for_types(
u"total_shares",
[int, long],
u"The total number of shares desired.",
)
def _serialize_peers(peers):
return sorted(base32.b2a(p) for p in peers)
_PEERS = Field(
u"peers",
_serialize_peers,
u"The read/write peers being considered.",
)
_READONLY_PEERS = Field(
u"readonly_peers",
_serialize_peers,
u"The read-only peers being considered.",
)
def _serialize_existing_shares(existing_shares):
return {
server: list(shares)
for (server, shares)
in existing_shares.iteritems()
}
_EXISTING_SHARES = Field(
u"existing_shares",
_serialize_existing_shares,
u"The shares that are believed to already have been placed.",
)
def _serialize_happiness_mappings(happiness_mappings):
return {
sharenum: base32.b2a(serverid)
for (sharenum, serverid)
in happiness_mappings.iteritems()
}
_HAPPINESS_MAPPINGS = Field(
u"happiness_mappings",
_serialize_happiness_mappings,
u"The computed happiness mapping for a particular upload.",
)
_HAPPINESS = Field.for_types(
u"happiness",
[int, long],
u"The computed happiness of a certain placement.",
)
_UPLOAD_TRACKERS = Field(
u"upload_trackers",
lambda trackers: list(
dict(
server=tracker.get_name(),
shareids=sorted(tracker.buckets.keys()),
)
for tracker
in trackers
),
u"Some servers which have agreed to hold some shares for us.",
)
_ALREADY_SERVERIDS = Field(
u"already_serverids",
lambda d: d,
u"Some servers which are already holding some shares that we were interested in uploading.",
)
LOCATE_ALL_SHAREHOLDERS = ActionType(
u"immutable:upload:locate-all-shareholders",
[],
[_UPLOAD_TRACKERS, _ALREADY_SERVERIDS],
u"Existing shareholders are being identified to plan upload actions.",
)
GET_SHARE_PLACEMENTS = MessageType(
u"immutable:upload:get-share-placements",
[_TOTAL_SHARES, _PEERS, _READONLY_PEERS, _EXISTING_SHARES, _HAPPINESS_MAPPINGS, _HAPPINESS],
u"Share placement is being computed for an upload.",
)
_EFFECTIVE_HAPPINESS = Field.for_types(
u"effective_happiness",
[int, long],
u"The computed happiness value of a share placement map.",
)
CONVERGED_HAPPINESS = MessageType(
u"immutable:upload:get-shareholders:converged-happiness",
[_EFFECTIVE_HAPPINESS],
u"The share placement algorithm has converged and placements efforts are complete.",
)
2006-12-01 09:54:28 +00:00
# this wants to live in storage, not here
class TooFullError(Exception):
pass
# HelperUploadResults are what we get from the Helper, and to retain
# backwards compatibility with old Helpers we can't change the format. We
# convert them into a local UploadResults upon receipt.
class HelperUploadResults(Copyable, RemoteCopy):
# note: don't change this string, it needs to match the value used on the
# helper, and it does *not* need to match the fully-qualified
# package/module/class name
typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
copytype = typeToCopy
# also, think twice about changing the shape of any existing attribute,
# because instances of this class are sent from the helper to its client,
# so changing this may break compatibility. Consider adding new fields
# instead of modifying existing ones.
def __init__(self):
self.timings = {} # dict of name to number of seconds
self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
self.file_size = None
self.ciphertext_fetched = None # how much the helper fetched
self.uri = None
self.preexisting_shares = None # count of shares already present
self.pushed_shares = None # count of shares we pushed
@implementer(IUploadResults)
class UploadResults(object):
def __init__(self, file_size,
ciphertext_fetched, # how much the helper fetched
preexisting_shares, # count of shares already present
pushed_shares, # count of shares we pushed
sharemap, # {shnum: set(server)}
servermap, # {server: set(shnum)}
timings, # dict of name to number of seconds
uri_extension_data,
uri_extension_hash,
verifycapstr):
self._file_size = file_size
self._ciphertext_fetched = ciphertext_fetched
self._preexisting_shares = preexisting_shares
self._pushed_shares = pushed_shares
self._sharemap = sharemap
self._servermap = servermap
self._timings = timings
self._uri_extension_data = uri_extension_data
self._uri_extension_hash = uri_extension_hash
self._verifycapstr = verifycapstr
def set_uri(self, uri):
self._uri = uri
def get_file_size(self):
return self._file_size
def get_uri(self):
return self._uri
def get_ciphertext_fetched(self):
return self._ciphertext_fetched
def get_preexisting_shares(self):
return self._preexisting_shares
def get_pushed_shares(self):
return self._pushed_shares
def get_sharemap(self):
return self._sharemap
def get_servermap(self):
return self._servermap
def get_timings(self):
return self._timings
def get_uri_extension_data(self):
return self._uri_extension_data
def get_verifycapstr(self):
return self._verifycapstr
2007-07-13 22:09:01 +00:00
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
# few places). Ask for a little bit more just in case we need it. If
# the extension changes size, we can change EXTENSION_SIZE to
# allocate a more accurate amount of space.
EXTENSION_SIZE = 1000
# TODO: actual extensions are closer to 419 bytes, so we can probably lower
# this.
2007-07-13 22:09:01 +00:00
def pretty_print_shnum_to_servers(s):
return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
class ServerTracker(object):
def __init__(self, server,
sharesize, blocksize, num_segments, num_share_hashes,
storage_index,
bucket_renewal_secret, bucket_cancel_secret):
self._server = server
2007-03-30 03:19:52 +00:00
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
wbp = layout.make_write_bucket_proxy(None, None, sharesize,
blocksize, num_segments,
num_share_hashes,
EXTENSION_SIZE)
self.wbp_class = wbp.__class__ # to create more of them
self.allocated_size = wbp.get_allocated_size()
2007-03-30 03:19:52 +00:00
self.blocksize = blocksize
self.num_segments = num_segments
self.num_share_hashes = num_share_hashes
self.storage_index = storage_index
2007-03-30 03:19:52 +00:00
self.renew_secret = bucket_renewal_secret
self.cancel_secret = bucket_cancel_secret
def __repr__(self):
return ("<ServerTracker for server %s and SI %s>"
% (self._server.get_name(), si_b2a(self.storage_index)[:5]))
def get_server(self):
return self._server
def get_serverid(self):
return self._server.get_serverid()
def get_name(self):
return self._server.get_name()
2007-03-30 03:19:52 +00:00
def query(self, sharenums):
storage_server = self._server.get_storage_server()
d = storage_server.allocate_buckets(
self.storage_index,
self.renew_secret,
self.cancel_secret,
sharenums,
self.allocated_size,
canary=Referenceable(),
)
d.addCallback(self._buckets_allocated)
2007-03-30 03:19:52 +00:00
return d
Fix up the behavior of #778, per reviewers' comments - Make some important utility functions clearer and more thoroughly documented. - Assert in upload.servers_of_happiness that the buckets attributes of PeerTrackers passed to it are mutually disjoint. - Get rid of some silly non-Pythonisms that I didn't see when I first wrote these patches. - Make sure that should_add_server returns true when queried about a shnum that it doesn't know about yet. - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set of peerids, alter dependencies to deal with that. - Remove upload.should_add_servers, because it is no longer necessary - Move upload.shares_of_happiness and upload.shares_by_server to a utility file. - Change some points in Tahoe2PeerSelector. - Compute servers_of_happiness using a bipartite matching algorithm that we know is optimal instead of an ad-hoc greedy algorithm that isn't. - Change servers_of_happiness to just take a sharemap as an argument, change its callers to merge existing_shares and used_peers before calling it. - Change an error message in the encoder to be more appropriate for servers of happiness. - Clarify the wording of an error message in immutable/upload.py - Refactor a happiness failure message to happinessutil.py, and make immutable/upload.py and immutable/encode.py use it. - Move the word "only" as far to the right as possible in failure messages. - Use a better definition of progress during peer selection. - Do read-only peer share detection queries in parallel, not sequentially. - Clean up logging semantics; print the query statistics whenever an upload is unsuccessful, not just in one case.
2010-05-14 00:49:17 +00:00
def ask_about_existing_shares(self):
storage_server = self._server.get_storage_server()
return storage_server.get_buckets(self.storage_index)
def _buckets_allocated(self, alreadygot_and_buckets):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
(alreadygot, buckets) = alreadygot_and_buckets
b = {}
for sharenum, rref in buckets.iteritems():
bp = self.wbp_class(rref, self._server, self.sharesize,
self.blocksize,
self.num_segments,
self.num_share_hashes,
EXTENSION_SIZE)
b[sharenum] = bp
self.buckets.update(b)
return (alreadygot, set(b.keys()))
def abort(self):
"""
I abort the remote bucket writers for all shares. This is a good idea
to conserve space on the storage server.
"""
self.abort_some_buckets(self.buckets.keys())
def abort_some_buckets(self, sharenums):
"""
I abort the remote bucket writers for the share numbers in sharenums.
"""
for sharenum in sharenums:
if sharenum in self.buckets:
self.buckets[sharenum].abort()
del self.buckets[sharenum]
def str_shareloc(shnum, bucketwriter):
return "%s: %s" % (shnum, bucketwriter.get_servername(),)
@implementer(IPeerSelector)
class PeerSelector(object):
def __init__(self, num_segments, total_shares, needed_shares, min_happiness):
self.num_segments = num_segments
self.total_shares = total_shares
self.needed_shares = needed_shares
self.min_happiness = min_happiness
self.existing_shares = {}
self.peers = set()
self.readonly_peers = set()
self.bad_peers = set()
def add_peer_with_share(self, peerid, shnum):
try:
self.existing_shares[peerid].add(shnum)
except KeyError:
self.existing_shares[peerid] = set([shnum])
def add_peer(self, peerid):
self.peers.add(peerid)
def mark_readonly_peer(self, peerid):
self.readonly_peers.add(peerid)
self.peers.remove(peerid)
def mark_bad_peer(self, peerid):
if peerid in self.peers:
self.peers.remove(peerid)
self.bad_peers.add(peerid)
elif peerid in self.readonly_peers:
self.readonly_peers.remove(peerid)
self.bad_peers.add(peerid)
def get_sharemap_of_preexisting_shares(self):
preexisting = dictutil.DictOfSets()
for server, shares in self.existing_shares.iteritems():
for share in shares:
preexisting.add(share, server)
return preexisting
def get_share_placements(self):
shares = set(range(self.total_shares))
self.happiness_mappings = share_placement(self.peers, self.readonly_peers, shares, self.existing_shares)
self.happiness = calculate_happiness(self.happiness_mappings)
GET_SHARE_PLACEMENTS.log(
total_shares=self.total_shares,
peers=self.peers,
readonly_peers=self.readonly_peers,
existing_shares=self.existing_shares,
happiness_mappings=self.happiness_mappings,
happiness=self.happiness,
)
return self.happiness_mappings
class _QueryStatistics(object):
def __init__(self):
self.total = 0
self.good = 0
self.bad = 0
self.full = 0
self.error = 0
self.contacted = 0
def __str__(self):
return "QueryStatistics(total={} good={} bad={} full={} " \
"error={} contacted={})".format(
self.total,
self.good,
self.bad,
self.full,
self.error,
self.contacted,
)
class Tahoe2ServerSelector(log.PrefixingLogMixin):
def __init__(self, upload_id, logparent=None, upload_status=None, reactor=None):
self.upload_id = upload_id
self._query_stats = _QueryStatistics()
self.last_failure_msg = None
self._status = IUploadStatus(upload_status)
log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
self.log("starting", level=log.OPERATIONAL)
if reactor is None:
from twisted.internet import reactor
self._reactor = reactor
def __repr__(self):
return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
def _create_trackers(self, candidate_servers, allocated_size,
file_renewal_secret, file_cancel_secret, create_server_tracker):
# filter the list of servers according to which ones can accomodate
# this request. This excludes older servers (which used a 4-byte size
# field) from getting large shares (for files larger than about
# 12GiB). See #439 for details.
def _get_maxsize(server):
v0 = server.get_version()
v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]
for server in candidate_servers:
self.peer_selector.add_peer(server.get_serverid())
writeable_servers = [
server for server in candidate_servers
if _get_maxsize(server) >= allocated_size
]
readonly_servers = set(candidate_servers) - set(writeable_servers)
for server in readonly_servers:
self.peer_selector.mark_readonly_peer(server.get_serverid())
def _make_trackers(servers):
trackers = []
for s in servers:
seed = s.get_lease_seed()
renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
st = create_server_tracker(s, renew, cancel)
trackers.append(st)
return trackers
write_trackers = _make_trackers(writeable_servers)
# We don't try to allocate shares to these servers, since they've
# said that they're incapable of storing shares of the size that we'd
# want to store. We ask them about existing shares for this storage
# index, which we want to know about for accurate
# servers_of_happiness accounting, then we forget about them.
readonly_trackers = _make_trackers(readonly_servers)
return readonly_trackers, write_trackers
@inline_callbacks
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally. * stop using IURI as an adapter * pass cap strings around instead of URI instances * move filenode/dirnode creation duties from Client to new NodeMaker class * move other Client duties to KeyGenerator, SecretHolder, History classes * stop passing Client reference to dirnode/filenode constructors - pass less-powerful references instead, like StorageBroker or Uploader * always create DirectoryNodes by wrapping a filenode (mutable for now) * remove some specialized mock classes from unit tests Detailed list of changes (done one at a time, then merged together) always pass a string to create_node_from_uri(), not an IURI instance always pass a string to IFilesystemNode constructors, not an IURI instance stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri() client.py: move SecretHolder code out to a separate class test_web.py: hush pyflakes client.py: move NodeMaker functionality out into a separate object LiteralFileNode: stop storing a Client reference immutable Checker: remove Client reference, it only needs a SecretHolder immutable Upload: remove Client reference, leave SecretHolder and StorageBroker immutable Repairer: replace Client reference with StorageBroker and SecretHolder immutable FileNode: remove Client reference mutable.Publish: stop passing Client mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference MutableChecker: reference StorageBroker and History directly, not through Client mutable.FileNode: removed unused indirection to checker classes mutable.FileNode: remove Client reference client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker move create_mutable_file() into NodeMaker test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests. test_mutable.py: clean up basedir names client.py: move create_empty_dirnode() into NodeMaker dirnode.py: get rid of DirectoryNode.create remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker remove Client from NodeMaker move helper status into History, pass History to web.Status instead of Client test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
def get_shareholders(self, storage_broker, secret_holder,
storage_index, share_size, block_size,
num_segments, total_shares, needed_shares,
min_happiness):
"""
@return: (upload_trackers, already_serverids), where upload_trackers
is a set of ServerTracker instances that have agreed to hold
some shares for us (the shareids are stashed inside the
ServerTracker), and already_serverids is a dict mapping
shnum to a set of serverids for servers which claim to
already have the share.
"""
# re-initialize statistics
self._query_status = _QueryStatistics()
if self._status:
self._status.set_status("Contacting Servers..")
self.peer_selector = PeerSelector(num_segments, total_shares,
needed_shares, min_happiness)
self.total_shares = total_shares
self.min_happiness = min_happiness
self.needed_shares = needed_shares
self.homeless_shares = set(range(total_shares))
self.use_trackers = set() # ServerTrackers that have shares assigned
# to them
self.preexisting_shares = {} # shareid => set(serverids) holding shareid
# These servers have shares -- any shares -- for our SI. We keep
Fix up the behavior of #778, per reviewers' comments - Make some important utility functions clearer and more thoroughly documented. - Assert in upload.servers_of_happiness that the buckets attributes of PeerTrackers passed to it are mutually disjoint. - Get rid of some silly non-Pythonisms that I didn't see when I first wrote these patches. - Make sure that should_add_server returns true when queried about a shnum that it doesn't know about yet. - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set of peerids, alter dependencies to deal with that. - Remove upload.should_add_servers, because it is no longer necessary - Move upload.shares_of_happiness and upload.shares_by_server to a utility file. - Change some points in Tahoe2PeerSelector. - Compute servers_of_happiness using a bipartite matching algorithm that we know is optimal instead of an ad-hoc greedy algorithm that isn't. - Change servers_of_happiness to just take a sharemap as an argument, change its callers to merge existing_shares and used_peers before calling it. - Change an error message in the encoder to be more appropriate for servers of happiness. - Clarify the wording of an error message in immutable/upload.py - Refactor a happiness failure message to happinessutil.py, and make immutable/upload.py and immutable/encode.py use it. - Move the word "only" as far to the right as possible in failure messages. - Use a better definition of progress during peer selection. - Do read-only peer share detection queries in parallel, not sequentially. - Clean up logging semantics; print the query statistics whenever an upload is unsuccessful, not just in one case.
2010-05-14 00:49:17 +00:00
# track of these to write an error message with them later.
self.serverids_with_shares = set()
# this needed_hashes computation should mirror
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
# (instead of a HashTree) because we don't require actual hashing
# just to count the levels.
ht = hashtree.IncompleteHashTree(total_shares)
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
# figure out how much space to ask for
wbp = layout.make_write_bucket_proxy(None, None,
share_size, 0, num_segments,
num_share_hashes, EXTENSION_SIZE)
allocated_size = wbp.get_allocated_size()
# decide upon the renewal/cancel secrets, to include them in the
# allocate_buckets query.
file_renewal_secret = file_renewal_secret_hash(
secret_holder.get_renewal_secret(),
storage_index,
)
file_cancel_secret = file_cancel_secret_hash(
secret_holder.get_cancel_secret(),
storage_index,
)
# see docs/specifications/servers-of-happiness.rst
# 0. Start with an ordered list of servers. Maybe *2N* of them.
#
all_servers = storage_broker.get_servers_for_psi(storage_index)
if not all_servers:
raise NoServersError("client gave us zero servers")
def _create_server_tracker(server, renew, cancel):
return ServerTracker(
server, share_size, block_size, num_segments, num_share_hashes,
storage_index, renew, cancel,
)
readonly_trackers, write_trackers = self._create_trackers(
all_servers[:(2 * total_shares)],
allocated_size,
file_renewal_secret,
file_cancel_secret,
_create_server_tracker,
)
# see docs/specifications/servers-of-happiness.rst
# 1. Query all servers for existing shares.
#
# The spec doesn't say what to do for timeouts/errors. This
# adds a timeout to each request, and rejects any that reply
# with error (i.e. just removed from the list)
Fix up the behavior of #778, per reviewers' comments - Make some important utility functions clearer and more thoroughly documented. - Assert in upload.servers_of_happiness that the buckets attributes of PeerTrackers passed to it are mutually disjoint. - Get rid of some silly non-Pythonisms that I didn't see when I first wrote these patches. - Make sure that should_add_server returns true when queried about a shnum that it doesn't know about yet. - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set of peerids, alter dependencies to deal with that. - Remove upload.should_add_servers, because it is no longer necessary - Move upload.shares_of_happiness and upload.shares_by_server to a utility file. - Change some points in Tahoe2PeerSelector. - Compute servers_of_happiness using a bipartite matching algorithm that we know is optimal instead of an ad-hoc greedy algorithm that isn't. - Change servers_of_happiness to just take a sharemap as an argument, change its callers to merge existing_shares and used_peers before calling it. - Change an error message in the encoder to be more appropriate for servers of happiness. - Clarify the wording of an error message in immutable/upload.py - Refactor a happiness failure message to happinessutil.py, and make immutable/upload.py and immutable/encode.py use it. - Move the word "only" as far to the right as possible in failure messages. - Use a better definition of progress during peer selection. - Do read-only peer share detection queries in parallel, not sequentially. - Clean up logging semantics; print the query statistics whenever an upload is unsuccessful, not just in one case.
2010-05-14 00:49:17 +00:00
ds = []
if self._status and readonly_trackers:
self._status.set_status(
"Contacting readonly servers to find any existing shares"
)
# in the "pre servers-of-happiness" code, it was a little
# ambigious whether "merely asking" counted as a "query" or
# not, because "allocate_buckets" with nothing to allocate was
# used to "ask" a write-able server what it held. Now we count
# "actual allocation queries" only, because those are the only
# things that actually affect what the server does.
for tracker in readonly_trackers:
assert isinstance(tracker, ServerTracker)
d = timeout_call(self._reactor, tracker.ask_about_existing_shares(), 15)
d.addBoth(self._handle_existing_response, tracker)
Fix up the behavior of #778, per reviewers' comments - Make some important utility functions clearer and more thoroughly documented. - Assert in upload.servers_of_happiness that the buckets attributes of PeerTrackers passed to it are mutually disjoint. - Get rid of some silly non-Pythonisms that I didn't see when I first wrote these patches. - Make sure that should_add_server returns true when queried about a shnum that it doesn't know about yet. - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set of peerids, alter dependencies to deal with that. - Remove upload.should_add_servers, because it is no longer necessary - Move upload.shares_of_happiness and upload.shares_by_server to a utility file. - Change some points in Tahoe2PeerSelector. - Compute servers_of_happiness using a bipartite matching algorithm that we know is optimal instead of an ad-hoc greedy algorithm that isn't. - Change servers_of_happiness to just take a sharemap as an argument, change its callers to merge existing_shares and used_peers before calling it. - Change an error message in the encoder to be more appropriate for servers of happiness. - Clarify the wording of an error message in immutable/upload.py - Refactor a happiness failure message to happinessutil.py, and make immutable/upload.py and immutable/encode.py use it. - Move the word "only" as far to the right as possible in failure messages. - Use a better definition of progress during peer selection. - Do read-only peer share detection queries in parallel, not sequentially. - Clean up logging semantics; print the query statistics whenever an upload is unsuccessful, not just in one case.
2010-05-14 00:49:17 +00:00
ds.append(d)
self.log("asking server %s for any existing shares" %
(tracker.get_name(),), level=log.NOISY)
for tracker in write_trackers:
assert isinstance(tracker, ServerTracker)
d = timeout_call(self._reactor, tracker.ask_about_existing_shares(), 15)
def timed_out(f, tracker):
# print("TIMEOUT {}: {}".format(tracker, f))
write_trackers.remove(tracker)
readonly_trackers.append(tracker)
return f
d.addErrback(timed_out, tracker)
d.addBoth(self._handle_existing_write_response, tracker, set())
ds.append(d)
self.log("asking server %s for any existing shares" %
(tracker.get_name(),), level=log.NOISY)
trackers = set(write_trackers) | set(readonly_trackers)
# these will always be (True, None) because errors are handled
# in the _handle_existing_write_response etc callbacks
yield defer.DeferredList(ds)
# okay, we've queried the 2N servers, time to get the share
# placements and attempt to actually place the shares (or
# renew them on read-only servers). We want to run the loop
# below *at least once* because even read-only servers won't
# renew their shares until "allocate_buckets" is called (via
# tracker.query())
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/778#comment:48
# min_happiness will be 0 for the repairer, so we set current
# effective_happiness to less than zero so this loop runs at
# least once for the repairer...
def _bad_server(fail, tracker):
self.last_failure_msg = fail
return False # will mark it readonly
def _make_readonly(tracker):
# print("making {} read-only".format(tracker.get_serverid()))
try:
write_trackers.remove(tracker)
except ValueError:
pass
# XXX can we just use a set() or does order matter?
if tracker not in readonly_trackers:
readonly_trackers.append(tracker)
return None
# so we *always* want to run this loop at least once, even if
# we only have read-only servers -- because asking them to
# allocate buckets renews those shares they already have. For
# subsequent loops, we give up if we've achieved happiness OR
# if we have zero writable servers left
last_happiness = None
effective_happiness = -1
while effective_happiness < min_happiness and \
(last_happiness is None or len(write_trackers)):
errors_before = self._query_stats.bad
self._share_placements = self.peer_selector.get_share_placements()
placements = []
for tracker in trackers:
shares_to_ask = self._allocation_for(tracker)
# if we already tried to upload share X to this very
# same server in a previous iteration, we should *not*
# ask again. If we *do* ask, there's no real harm, but
# the server will respond with an empty dict and that
# confuses our statistics. However, if the server is a
# readonly sever, we *do* want to ask so it refreshes
# the share.
if shares_to_ask != set(tracker.buckets.keys()) or tracker in readonly_trackers:
self._query_stats.total += 1
self._query_stats.contacted += 1
d = timeout_call(self._reactor, tracker.query(shares_to_ask), 15)
d.addBoth(self._buckets_allocated, tracker, shares_to_ask)
d.addErrback(lambda f, tr: _bad_server(f, tr), tracker)
d.addCallback(lambda x, tr: _make_readonly(tr) if not x else x, tracker)
placements.append(d)
yield defer.DeferredList(placements)
merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers)
effective_happiness = servers_of_happiness(merged)
if effective_happiness == last_happiness:
# print("effective happiness still {}".format(last_happiness))
# we haven't improved over the last iteration; give up
break;
if errors_before == self._query_stats.bad:
break;
last_happiness = effective_happiness
# print("write trackers left: {}".format(len(write_trackers)))
# note: peer_selector.get_allocations() only maps "things we
# uploaded in the above loop" and specificaly does *not*
# include any pre-existing shares on read-only servers .. but
# we *do* want to count those shares towards total happiness.
# no more servers. If we haven't placed enough shares, we fail.
# XXX note sometimes we're not running the loop at least once,
# and so 'merged' must be (re-)computed here.
merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers)
effective_happiness = servers_of_happiness(merged)
# print("placements completed {} vs {}".format(effective_happiness, min_happiness))
# for k, v in merged.items():
# print(" {} -> {}".format(k, v))
CONVERGED_HAPPINESS.log(
effective_happiness=effective_happiness,
)
if effective_happiness < min_happiness:
msg = failure_message(
peer_count=len(self.serverids_with_shares),
k=self.needed_shares,
happy=min_happiness,
effective_happy=effective_happiness,
)
msg = ("server selection failed for %s: %s (%s), merged=%s" %
(self, msg, self._get_progress_message(),
pretty_print_shnum_to_servers(merged)))
if self.last_failure_msg:
msg += " (%s)" % (self.last_failure_msg,)
self.log(msg, level=log.UNUSUAL)
self._failed(msg) # raises UploadUnhappinessError
return
# we placed (or already had) enough to be happy, so we're done
if self._status:
self._status.set_status("Placed all shares")
msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
"self.use_trackers: %s, self.preexisting_shares: %s") \
% (self, self._get_progress_message(),
pretty_print_shnum_to_servers(merged),
[', '.join([str_shareloc(k,v)
for k,v in st.buckets.iteritems()])
for st in self.use_trackers],
pretty_print_shnum_to_servers(self.preexisting_shares))
self.log(msg, level=log.OPERATIONAL)
defer.returnValue((self.use_trackers, self.peer_selector.get_sharemap_of_preexisting_shares()))
def _handle_existing_response(self, res, tracker):
Fix up the behavior of #778, per reviewers' comments - Make some important utility functions clearer and more thoroughly documented. - Assert in upload.servers_of_happiness that the buckets attributes of PeerTrackers passed to it are mutually disjoint. - Get rid of some silly non-Pythonisms that I didn't see when I first wrote these patches. - Make sure that should_add_server returns true when queried about a shnum that it doesn't know about yet. - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set of peerids, alter dependencies to deal with that. - Remove upload.should_add_servers, because it is no longer necessary - Move upload.shares_of_happiness and upload.shares_by_server to a utility file. - Change some points in Tahoe2PeerSelector. - Compute servers_of_happiness using a bipartite matching algorithm that we know is optimal instead of an ad-hoc greedy algorithm that isn't. - Change servers_of_happiness to just take a sharemap as an argument, change its callers to merge existing_shares and used_peers before calling it. - Change an error message in the encoder to be more appropriate for servers of happiness. - Clarify the wording of an error message in immutable/upload.py - Refactor a happiness failure message to happinessutil.py, and make immutable/upload.py and immutable/encode.py use it. - Move the word "only" as far to the right as possible in failure messages. - Use a better definition of progress during peer selection. - Do read-only peer share detection queries in parallel, not sequentially. - Clean up logging semantics; print the query statistics whenever an upload is unsuccessful, not just in one case.
2010-05-14 00:49:17 +00:00
"""
I handle responses to the queries sent by
Tahoe2ServerSelector.get_shareholders.
Fix up the behavior of #778, per reviewers' comments - Make some important utility functions clearer and more thoroughly documented. - Assert in upload.servers_of_happiness that the buckets attributes of PeerTrackers passed to it are mutually disjoint. - Get rid of some silly non-Pythonisms that I didn't see when I first wrote these patches. - Make sure that should_add_server returns true when queried about a shnum that it doesn't know about yet. - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set of peerids, alter dependencies to deal with that. - Remove upload.should_add_servers, because it is no longer necessary - Move upload.shares_of_happiness and upload.shares_by_server to a utility file. - Change some points in Tahoe2PeerSelector. - Compute servers_of_happiness using a bipartite matching algorithm that we know is optimal instead of an ad-hoc greedy algorithm that isn't. - Change servers_of_happiness to just take a sharemap as an argument, change its callers to merge existing_shares and used_peers before calling it. - Change an error message in the encoder to be more appropriate for servers of happiness. - Clarify the wording of an error message in immutable/upload.py - Refactor a happiness failure message to happinessutil.py, and make immutable/upload.py and immutable/encode.py use it. - Move the word "only" as far to the right as possible in failure messages. - Use a better definition of progress during peer selection. - Do read-only peer share detection queries in parallel, not sequentially. - Clean up logging semantics; print the query statistics whenever an upload is unsuccessful, not just in one case.
2010-05-14 00:49:17 +00:00
"""
serverid = tracker.get_serverid()
if isinstance(res, failure.Failure):
self.log("%s got error during existing shares check: %s"
% (tracker.get_name(), res), level=log.UNUSUAL)
self.peer_selector.mark_bad_peer(serverid)
else:
buckets = res
if buckets:
self.serverids_with_shares.add(serverid)
self.log("response to get_buckets() from server %s: alreadygot=%s"
% (tracker.get_name(), tuple(sorted(buckets))),
level=log.NOISY)
for bucket in buckets:
self.peer_selector.add_peer_with_share(serverid, bucket)
self.preexisting_shares.setdefault(bucket, set()).add(serverid)
self.homeless_shares.discard(bucket)
def _handle_existing_write_response(self, res, tracker, shares_to_ask):
"""
Function handles the response from the write servers
when inquiring about what shares each server already has.
"""
if isinstance(res, failure.Failure):
self.peer_selector.mark_bad_peer(tracker.get_serverid())
self.log("%s got error during server selection: %s" % (tracker, res),
level=log.UNUSUAL)
self.homeless_shares |= shares_to_ask
msg = ("last failure (from %s) was: %s" % (tracker, res))
self.last_failure_msg = msg
else:
for share in res.keys():
self.peer_selector.add_peer_with_share(tracker.get_serverid(), share)
Fix up the behavior of #778, per reviewers' comments - Make some important utility functions clearer and more thoroughly documented. - Assert in upload.servers_of_happiness that the buckets attributes of PeerTrackers passed to it are mutually disjoint. - Get rid of some silly non-Pythonisms that I didn't see when I first wrote these patches. - Make sure that should_add_server returns true when queried about a shnum that it doesn't know about yet. - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set of peerids, alter dependencies to deal with that. - Remove upload.should_add_servers, because it is no longer necessary - Move upload.shares_of_happiness and upload.shares_by_server to a utility file. - Change some points in Tahoe2PeerSelector. - Compute servers_of_happiness using a bipartite matching algorithm that we know is optimal instead of an ad-hoc greedy algorithm that isn't. - Change servers_of_happiness to just take a sharemap as an argument, change its callers to merge existing_shares and used_peers before calling it. - Change an error message in the encoder to be more appropriate for servers of happiness. - Clarify the wording of an error message in immutable/upload.py - Refactor a happiness failure message to happinessutil.py, and make immutable/upload.py and immutable/encode.py use it. - Move the word "only" as far to the right as possible in failure messages. - Use a better definition of progress during peer selection. - Do read-only peer share detection queries in parallel, not sequentially. - Clean up logging semantics; print the query statistics whenever an upload is unsuccessful, not just in one case.
2010-05-14 00:49:17 +00:00
def _get_progress_message(self):
if not self.homeless_shares:
msg = "placed all %d shares, " % (self.total_shares)
else:
msg = ("placed %d shares out of %d total (%d homeless), " %
(self.total_shares - len(self.homeless_shares),
self.total_shares,
len(self.homeless_shares)))
assert self._query_stats.bad == (self._query_stats.full + self._query_stats.error)
return (
msg + "want to place shares on at least {happy} servers such that "
"any {needed} of them have enough shares to recover the file, "
"sent {queries} queries to {servers} servers, "
"{good} queries placed some shares, {bad} placed none "
"(of which {full} placed none due to the server being"
" full and {error} placed none due to an error)".format(
happy=self.min_happiness,
needed=self.needed_shares,
queries=self._query_stats.total,
servers=self._query_stats.contacted,
good=self._query_stats.good,
bad=self._query_stats.bad,
full=self._query_stats.full,
error=self._query_stats.error
)
)
def _allocation_for(self, tracker):
"""
Given a ServerTracker, return a list of shares that we should
store on that server.
"""
assert isinstance(tracker, ServerTracker)
shares_to_ask = set()
servermap = self._share_placements
for shnum, tracker_id in servermap.items():
if tracker_id == None:
continue
if tracker.get_serverid() == tracker_id:
shares_to_ask.add(shnum)
if shnum in self.homeless_shares:
self.homeless_shares.remove(shnum)
if self._status:
self._status.set_status("Contacting Servers [%s] (first query),"
" %d shares left.."
% (tracker.get_name(),
len(self.homeless_shares)))
return shares_to_ask
def _buckets_allocated(self, res, tracker, shares_to_ask):
"""
Internal helper. If this returns an error or False, the server
will be considered read-only for any future iterations.
"""
if isinstance(res, failure.Failure):
# This is unusual, and probably indicates a bug or a network
# problem.
self.log("%s got error during server selection: %s" % (tracker, res),
level=log.UNUSUAL)
self._query_stats.error += 1
self._query_stats.bad += 1
self.homeless_shares |= shares_to_ask
try:
self.peer_selector.mark_readonly_peer(tracker.get_serverid())
except KeyError:
pass
return res
else:
(alreadygot, allocated) = res
self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
% (tracker.get_name(),
2008-01-15 04:19:20 +00:00
tuple(sorted(alreadygot)), tuple(sorted(allocated))),
level=log.NOISY)
progress = False
for s in alreadygot:
self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
Fix up the behavior of #778, per reviewers' comments - Make some important utility functions clearer and more thoroughly documented. - Assert in upload.servers_of_happiness that the buckets attributes of PeerTrackers passed to it are mutually disjoint. - Get rid of some silly non-Pythonisms that I didn't see when I first wrote these patches. - Make sure that should_add_server returns true when queried about a shnum that it doesn't know about yet. - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set of peerids, alter dependencies to deal with that. - Remove upload.should_add_servers, because it is no longer necessary - Move upload.shares_of_happiness and upload.shares_by_server to a utility file. - Change some points in Tahoe2PeerSelector. - Compute servers_of_happiness using a bipartite matching algorithm that we know is optimal instead of an ad-hoc greedy algorithm that isn't. - Change servers_of_happiness to just take a sharemap as an argument, change its callers to merge existing_shares and used_peers before calling it. - Change an error message in the encoder to be more appropriate for servers of happiness. - Clarify the wording of an error message in immutable/upload.py - Refactor a happiness failure message to happinessutil.py, and make immutable/upload.py and immutable/encode.py use it. - Move the word "only" as far to the right as possible in failure messages. - Use a better definition of progress during peer selection. - Do read-only peer share detection queries in parallel, not sequentially. - Clean up logging semantics; print the query statistics whenever an upload is unsuccessful, not just in one case.
2010-05-14 00:49:17 +00:00
if s in self.homeless_shares:
self.homeless_shares.remove(s)
progress = True
elif s in shares_to_ask:
progress = True
# the ServerTracker will remember which shares were allocated on
# that peer. We just have to remember to use them.
if allocated:
self.use_trackers.add(tracker)
progress = True
if allocated or alreadygot:
self.serverids_with_shares.add(tracker.get_serverid())
not_yet_present = set(shares_to_ask) - set(alreadygot)
still_homeless = not_yet_present - set(allocated)
if still_homeless:
# In networks with lots of space, this is very unusual and
# probably indicates an error. In networks with servers that
# are full, it is merely unusual. In networks that are very
# full, it is common, and many uploads will fail. In most
# cases, this is obviously not fatal, and we'll just use some
# other servers.
# some shares are still homeless, keep trying to find them a
# home. The ones that were rejected get first priority.
self.homeless_shares |= still_homeless
# Since they were unable to accept all of our requests, so it
# is safe to assume that asking them again won't help.
if progress:
# They accepted at least one of the shares that we asked
# them to accept, or they had a share that we didn't ask
# them to accept but that we hadn't placed yet, so this
# was a productive query
self._query_stats.good += 1
else:
# if we asked for some allocations, but the server
# didn't return any at all (i.e. empty dict) it must
# be full
self._query_stats.full += 1
self._query_stats.bad += 1
return progress
def _failed(self, msg):
"""
I am called when server selection fails. I first abort all of the
remote buckets that I allocated during my unsuccessful attempt to
place shares for this file. I then raise an
UploadUnhappinessError with my msg argument.
"""
for tracker in self.use_trackers:
assert isinstance(tracker, ServerTracker)
tracker.abort()
raise UploadUnhappinessError(msg)
@implementer(IEncryptedUploadable)
class EncryptAnUploadable(object):
"""This is a wrapper that takes an IUploadable and provides
IEncryptedUploadable."""
CHUNKSIZE = 50*1024
def __init__(self, original, log_parent=None, progress=None):
precondition(original.default_params_set,
"set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
self.original = IUploadable(original)
2008-03-01 03:00:45 +00:00
self._log_number = log_parent
self._encryptor = None
self._plaintext_hasher = plaintext_hasher()
self._plaintext_segment_hasher = None
self._plaintext_segment_hashes = []
self._encoding_parameters = None
self._file_size = None
self._ciphertext_bytes_read = 0
self._status = None
self._progress = progress
def set_upload_status(self, upload_status):
self._status = IUploadStatus(upload_status)
self.original.set_upload_status(upload_status)
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "upload.encryption"
2008-03-01 03:00:45 +00:00
if "parent" not in kwargs:
kwargs["parent"] = self._log_number
return log.msg(*args, **kwargs)
def get_size(self):
if self._file_size is not None:
return defer.succeed(self._file_size)
d = self.original.get_size()
def _got_size(size):
self._file_size = size
if self._status:
self._status.set_size(size)
if self._progress:
self._progress.set_progress_total(size)
return size
d.addCallback(_got_size)
return d
def get_all_encoding_parameters(self):
if self._encoding_parameters is not None:
return defer.succeed(self._encoding_parameters)
d = self.original.get_all_encoding_parameters()
def _got(encoding_parameters):
(k, happy, n, segsize) = encoding_parameters
self._segment_size = segsize # used by segment hashers
self._encoding_parameters = encoding_parameters
self.log("my encoding parameters: %s" % (encoding_parameters,),
level=log.NOISY)
return encoding_parameters
d.addCallback(_got)
return d
def _get_encryptor(self):
if self._encryptor:
return defer.succeed(self._encryptor)
d = self.original.get_encryption_key()
def _got(key):
2019-06-17 21:54:46 +00:00
self._encryptor = aes.create_encryptor(key)
storage_index = storage_index_hash(key)
assert isinstance(storage_index, str)
# There's no point to having the SI be longer than the key, so we
# specify that it is truncated to the same 128 bits as the AES key.
assert len(storage_index) == 16 # SHA-256 truncated to 128b
self._storage_index = storage_index
if self._status:
self._status.set_storage_index(storage_index)
2019-06-17 21:54:46 +00:00
return self._encryptor
d.addCallback(_got)
return d
def get_storage_index(self):
d = self._get_encryptor()
d.addCallback(lambda res: self._storage_index)
return d
def _get_segment_hasher(self):
p = self._plaintext_segment_hasher
if p:
left = self._segment_size - self._plaintext_segment_hashed_bytes
return p, left
p = plaintext_segment_hasher()
self._plaintext_segment_hasher = p
self._plaintext_segment_hashed_bytes = 0
return p, self._segment_size
def _update_segment_hash(self, chunk):
offset = 0
while offset < len(chunk):
p, segment_left = self._get_segment_hasher()
chunk_left = len(chunk) - offset
this_segment = min(chunk_left, segment_left)
p.update(chunk[offset:offset+this_segment])
self._plaintext_segment_hashed_bytes += this_segment
if self._plaintext_segment_hashed_bytes == self._segment_size:
# we've filled this segment
self._plaintext_segment_hashes.append(p.digest())
self._plaintext_segment_hasher = None
self.log("closed hash [%d]: %dB" %
(len(self._plaintext_segment_hashes)-1,
self._plaintext_segment_hashed_bytes),
level=log.NOISY)
self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
segnum=len(self._plaintext_segment_hashes)-1,
hash=base32.b2a(p.digest()),
level=log.NOISY)
offset += this_segment
def read_encrypted(self, length, hash_only):
# make sure our parameters have been set up first
d = self.get_all_encoding_parameters()
# and size
d.addCallback(lambda ignored: self.get_size())
d.addCallback(lambda ignored: self._get_encryptor())
# then fetch and encrypt the plaintext. The unusual structure here
# (passing a Deferred *into* a function) is needed to avoid
# overflowing the stack: Deferreds don't optimize out tail recursion.
# We also pass in a list, to which _read_encrypted will append
# ciphertext.
ciphertext = []
d2 = defer.Deferred()
d.addCallback(lambda ignored:
self._read_encrypted(length, ciphertext, hash_only, d2))
d.addCallback(lambda ignored: d2)
return d
def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
if not remaining:
fire_when_done.callback(ciphertext)
return None
# tolerate large length= values without consuming a lot of RAM by
# reading just a chunk (say 50kB) at a time. This only really matters
# when hash_only==True (i.e. resuming an interrupted upload), since
# that's the case where we will be skipping over a lot of data.
size = min(remaining, self.CHUNKSIZE)
remaining = remaining - size
# read a chunk of plaintext..
d = defer.maybeDeferred(self.original.read, size)
# N.B.: if read() is synchronous, then since everything else is
# actually synchronous too, we'd blow the stack unless we stall for a
# tick. Once you accept a Deferred from IUploadable.read(), you must
# be prepared to have it fire immediately too.
d.addCallback(fireEventually)
def _good(plaintext):
# and encrypt it..
# o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
ciphertext.extend(ct)
self._read_encrypted(remaining, ciphertext, hash_only,
fire_when_done)
def _err(why):
fire_when_done.errback(why)
d.addCallback(_good)
d.addErrback(_err)
return None
def _hash_and_encrypt_plaintext(self, data, hash_only):
assert isinstance(data, (tuple, list)), type(data)
data = list(data)
cryptdata = []
# we use data.pop(0) instead of 'for chunk in data' to save
# memory: each chunk is destroyed as soon as we're done with it.
bytes_processed = 0
while data:
chunk = data.pop(0)
2008-03-01 03:00:45 +00:00
self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
level=log.NOISY)
bytes_processed += len(chunk)
self._plaintext_hasher.update(chunk)
self._update_segment_hash(chunk)
# TODO: we have to encrypt the data (even if hash_only==True)
# because the AES-CTR implementation doesn't offer a
# way to change the counter value. Once it acquires
# this ability, change this to simply update the counter
2019-06-17 21:54:46 +00:00
# before each call to (hash_only==False) encrypt_data
ciphertext = aes.encrypt_data(self._encryptor, chunk)
if hash_only:
2008-03-01 03:00:45 +00:00
self.log(" skipping encryption", level=log.NOISY)
else:
cryptdata.append(ciphertext)
del ciphertext
del chunk
self._ciphertext_bytes_read += bytes_processed
if self._status:
progress = float(self._ciphertext_bytes_read) / self._file_size
self._status.set_progress(1, progress)
return cryptdata
def get_plaintext_hashtree_leaves(self, first, last, num_segments):
# this is currently unused, but will live again when we fix #453
if len(self._plaintext_segment_hashes) < num_segments:
# close out the last one
assert len(self._plaintext_segment_hashes) == num_segments-1
p, segment_left = self._get_segment_hasher()
self._plaintext_segment_hashes.append(p.digest())
del self._plaintext_segment_hasher
self.log("closing plaintext leaf hasher, hashed %d bytes" %
self._plaintext_segment_hashed_bytes,
level=log.NOISY)
self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
segnum=len(self._plaintext_segment_hashes)-1,
hash=base32.b2a(p.digest()),
level=log.NOISY)
assert len(self._plaintext_segment_hashes) == num_segments
return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
def get_plaintext_hash(self):
h = self._plaintext_hasher.digest()
return defer.succeed(h)
def close(self):
return self.original.close()
@implementer(IUploadStatus)
class UploadStatus(object):
statusid_counter = itertools.count(0)
def __init__(self):
self.storage_index = None
self.size = None
self.helper = False
self.status = "Not started"
self.progress = [0.0, 0.0, 0.0]
self.active = True
self.results = None
self.counter = self.statusid_counter.next()
self.started = time.time()
def get_started(self):
return self.started
def get_storage_index(self):
return self.storage_index
def get_size(self):
return self.size
def using_helper(self):
return self.helper
def get_status(self):
return self.status
def get_progress(self):
return tuple(self.progress)
def get_active(self):
return self.active
def get_results(self):
return self.results
def get_counter(self):
return self.counter
def set_storage_index(self, si):
self.storage_index = si
def set_size(self, size):
self.size = size
def set_helper(self, helper):
self.helper = helper
def set_status(self, status):
self.status = status
def set_progress(self, which, value):
# [0]: chk, [1]: ciphertext, [2]: encode+push
self.progress[which] = value
def set_active(self, value):
self.active = value
def set_results(self, value):
self.results = value
class CHKUploader(object):
def __init__(self, storage_broker, secret_holder, progress=None, reactor=None):
# server_selector needs storage_broker and secret_holder
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally. * stop using IURI as an adapter * pass cap strings around instead of URI instances * move filenode/dirnode creation duties from Client to new NodeMaker class * move other Client duties to KeyGenerator, SecretHolder, History classes * stop passing Client reference to dirnode/filenode constructors - pass less-powerful references instead, like StorageBroker or Uploader * always create DirectoryNodes by wrapping a filenode (mutable for now) * remove some specialized mock classes from unit tests Detailed list of changes (done one at a time, then merged together) always pass a string to create_node_from_uri(), not an IURI instance always pass a string to IFilesystemNode constructors, not an IURI instance stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri() client.py: move SecretHolder code out to a separate class test_web.py: hush pyflakes client.py: move NodeMaker functionality out into a separate object LiteralFileNode: stop storing a Client reference immutable Checker: remove Client reference, it only needs a SecretHolder immutable Upload: remove Client reference, leave SecretHolder and StorageBroker immutable Repairer: replace Client reference with StorageBroker and SecretHolder immutable FileNode: remove Client reference mutable.Publish: stop passing Client mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference MutableChecker: reference StorageBroker and History directly, not through Client mutable.FileNode: removed unused indirection to checker classes mutable.FileNode: remove Client reference client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker move create_mutable_file() into NodeMaker test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests. test_mutable.py: clean up basedir names client.py: move create_empty_dirnode() into NodeMaker dirnode.py: get rid of DirectoryNode.create remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker remove Client from NodeMaker move helper status into History, pass History to web.Status instead of Client test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
self._storage_broker = storage_broker
self._secret_holder = secret_holder
self._log_number = self.log("CHKUploader starting", parent=None)
self._encoder = None
self._storage_index = None
self._upload_status = UploadStatus()
self._upload_status.set_helper(False)
self._upload_status.set_active(True)
self._progress = progress
self._reactor = reactor
# locate_all_shareholders() will create the following attribute:
# self._server_trackers = {} # k: shnum, v: instance of ServerTracker
2008-01-15 04:19:20 +00:00
def log(self, *args, **kwargs):
if "parent" not in kwargs:
kwargs["parent"] = self._log_number
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.upload"
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally. * stop using IURI as an adapter * pass cap strings around instead of URI instances * move filenode/dirnode creation duties from Client to new NodeMaker class * move other Client duties to KeyGenerator, SecretHolder, History classes * stop passing Client reference to dirnode/filenode constructors - pass less-powerful references instead, like StorageBroker or Uploader * always create DirectoryNodes by wrapping a filenode (mutable for now) * remove some specialized mock classes from unit tests Detailed list of changes (done one at a time, then merged together) always pass a string to create_node_from_uri(), not an IURI instance always pass a string to IFilesystemNode constructors, not an IURI instance stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri() client.py: move SecretHolder code out to a separate class test_web.py: hush pyflakes client.py: move NodeMaker functionality out into a separate object LiteralFileNode: stop storing a Client reference immutable Checker: remove Client reference, it only needs a SecretHolder immutable Upload: remove Client reference, leave SecretHolder and StorageBroker immutable Repairer: replace Client reference with StorageBroker and SecretHolder immutable FileNode: remove Client reference mutable.Publish: stop passing Client mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference MutableChecker: reference StorageBroker and History directly, not through Client mutable.FileNode: removed unused indirection to checker classes mutable.FileNode: remove Client reference client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker move create_mutable_file() into NodeMaker test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests. test_mutable.py: clean up basedir names client.py: move create_empty_dirnode() into NodeMaker dirnode.py: get rid of DirectoryNode.create remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker remove Client from NodeMaker move helper status into History, pass History to web.Status instead of Client test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
return log.msg(*args, **kwargs)
@log_call_deferred(action_type=u"immutable:upload:chk:start")
def start(self, encrypted_uploadable):
"""Start uploading the file.
Returns a Deferred that will fire with the UploadResults instance.
"""
self._started = time.time()
eu = IEncryptedUploadable(encrypted_uploadable)
self.log("starting upload of %s" % eu)
eu.set_upload_status(self._upload_status)
d = self.start_encrypted(eu)
def _done(uploadresults):
self._upload_status.set_active(False)
return uploadresults
d.addBoth(_done)
return d
def abort(self):
"""Call this if the upload must be abandoned before it completes.
This will tell the shareholders to delete their partial shares. I
return a Deferred that fires when these messages have been acked."""
if not self._encoder:
# how did you call abort() before calling start() ?
return defer.succeed(None)
return self._encoder.abort()
@log_call_deferred(action_type=u"immutable:upload:chk:start-encrypted")
@inline_callbacks
def start_encrypted(self, encrypted):
"""
Returns a Deferred that will fire with the UploadResults instance.
"""
eu = IEncryptedUploadable(encrypted)
started = time.time()
# would be Really Nice to make Encoder just a local; only
# abort() really needs self._encoder ...
self._encoder = encode.Encoder(
self._log_number,
self._upload_status,
progress=self._progress,
)
# this just returns itself
yield self._encoder.set_encrypted_uploadable(eu)
with LOCATE_ALL_SHAREHOLDERS() as action:
(upload_trackers, already_serverids) = yield self.locate_all_shareholders(self._encoder, started)
action.add_success_fields(upload_trackers=upload_trackers, already_serverids=already_serverids)
self.set_shareholders(upload_trackers, already_serverids, self._encoder)
verifycap = yield self._encoder.start()
results = self._encrypted_done(verifycap)
defer.returnValue(results)
def locate_all_shareholders(self, encoder, started):
server_selection_started = now = time.time()
self._storage_index_elapsed = now - started
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally. * stop using IURI as an adapter * pass cap strings around instead of URI instances * move filenode/dirnode creation duties from Client to new NodeMaker class * move other Client duties to KeyGenerator, SecretHolder, History classes * stop passing Client reference to dirnode/filenode constructors - pass less-powerful references instead, like StorageBroker or Uploader * always create DirectoryNodes by wrapping a filenode (mutable for now) * remove some specialized mock classes from unit tests Detailed list of changes (done one at a time, then merged together) always pass a string to create_node_from_uri(), not an IURI instance always pass a string to IFilesystemNode constructors, not an IURI instance stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri() client.py: move SecretHolder code out to a separate class test_web.py: hush pyflakes client.py: move NodeMaker functionality out into a separate object LiteralFileNode: stop storing a Client reference immutable Checker: remove Client reference, it only needs a SecretHolder immutable Upload: remove Client reference, leave SecretHolder and StorageBroker immutable Repairer: replace Client reference with StorageBroker and SecretHolder immutable FileNode: remove Client reference mutable.Publish: stop passing Client mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference MutableChecker: reference StorageBroker and History directly, not through Client mutable.FileNode: removed unused indirection to checker classes mutable.FileNode: remove Client reference client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker move create_mutable_file() into NodeMaker test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests. test_mutable.py: clean up basedir names client.py: move create_empty_dirnode() into NodeMaker dirnode.py: get rid of DirectoryNode.create remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker remove Client from NodeMaker move helper status into History, pass History to web.Status instead of Client test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
storage_broker = self._storage_broker
secret_holder = self._secret_holder
storage_index = encoder.get_param("storage_index")
self._storage_index = storage_index
upload_id = si_b2a(storage_index)[:5]
self.log("using storage index %s" % upload_id)
server_selector = Tahoe2ServerSelector(
upload_id,
self._log_number,
self._upload_status,
reactor=self._reactor,
)
share_size = encoder.get_param("share_size")
block_size = encoder.get_param("block_size")
num_segments = encoder.get_param("num_segments")
k, desired, n = encoder.get_param("share_counts")
self._server_selection_started = time.time()
d = server_selector.get_shareholders(storage_broker, secret_holder,
storage_index,
share_size, block_size,
num_segments, n, k, desired)
def _done(res):
self._server_selection_elapsed = time.time() - server_selection_started
return res
d.addCallback(_done)
return d
def set_shareholders(self, upload_trackers, already_serverids, encoder):
2007-03-30 21:54:33 +00:00
"""
:param upload_trackers: a sequence of ServerTracker objects that
have agreed to hold some shares for us (the
shareids are stashed inside the ServerTracker)
:param already_serverids: a dict mapping sharenum to a set of
serverids for servers that claim to already
have this share
2007-03-30 21:54:33 +00:00
"""
msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
values = ([', '.join([str_shareloc(k,v)
for k,v in st.buckets.iteritems()])
for st in upload_trackers], already_serverids)
self.log(msgtempl % values, level=log.OPERATIONAL)
# record already-present shares in self._results
self._count_preexisting_shares = len(already_serverids)
self._server_trackers = {} # k: shnum, v: instance of ServerTracker
for tracker in upload_trackers:
assert isinstance(tracker, ServerTracker)
2007-03-30 03:19:52 +00:00
buckets = {}
servermap = already_serverids.copy()
for tracker in upload_trackers:
buckets.update(tracker.buckets)
for shnum in tracker.buckets:
self._server_trackers[shnum] = tracker
servermap.setdefault(shnum, set()).add(tracker.get_serverid())
assert len(buckets) == sum([len(tracker.buckets)
for tracker in upload_trackers]), \
"%s (%s) != %s (%s)" % (
len(buckets),
buckets,
sum([len(tracker.buckets) for tracker in upload_trackers]),
[(t.buckets, t.get_serverid()) for t in upload_trackers]
)
encoder.set_shareholders(buckets, servermap)
def _encrypted_done(self, verifycap):
"""
:return UploadResults: A description of the outcome of the upload.
"""
e = self._encoder
sharemap = dictutil.DictOfSets()
servermap = dictutil.DictOfSets()
for shnum in e.get_shares_placed():
server = self._server_trackers[shnum].get_server()
sharemap.add(shnum, server)
servermap.add(server, shnum)
now = time.time()
timings = {}
timings["total"] = now - self._started
timings["storage_index"] = self._storage_index_elapsed
timings["peer_selection"] = self._server_selection_elapsed
timings.update(e.get_times())
ur = UploadResults(file_size=e.file_size,
ciphertext_fetched=0,
preexisting_shares=self._count_preexisting_shares,
pushed_shares=len(e.get_shares_placed()),
sharemap=sharemap,
servermap=servermap,
timings=timings,
uri_extension_data=e.get_uri_extension_data(),
uri_extension_hash=e.get_uri_extension_hash(),
verifycapstr=verifycap.to_string())
self._upload_status.set_results(ur)
return ur
2006-12-01 09:54:28 +00:00
def get_upload_status(self):
return self._upload_status
def read_this_many_bytes(uploadable, size, prepend_data=[]):
if size == 0:
return defer.succeed([])
d = uploadable.read(size)
def _got(data):
assert isinstance(data, list)
bytes = sum([len(piece) for piece in data])
assert bytes > 0
assert bytes <= size
remaining = size - bytes
if remaining:
return read_this_many_bytes(uploadable, remaining,
prepend_data + data)
return prepend_data + data
d.addCallback(_got)
return d
2019-05-15 06:17:44 +00:00
class LiteralUploader(object):
def __init__(self, progress=None):
self._status = s = UploadStatus()
s.set_storage_index(None)
s.set_helper(False)
s.set_progress(0, 1.0)
s.set_active(False)
self._progress = progress
def start(self, uploadable):
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
def _got_size(size):
self._size = size
self._status.set_size(size)
if self._progress:
self._progress.set_progress_total(size)
return read_this_many_bytes(uploadable, size)
d.addCallback(_got_size)
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
d.addCallback(lambda u: u.to_string())
d.addCallback(self._build_results)
return d
def _build_results(self, uri):
ur = UploadResults(file_size=self._size,
ciphertext_fetched=0,
preexisting_shares=0,
pushed_shares=0,
sharemap={},
servermap={},
timings={},
uri_extension_data=None,
uri_extension_hash=None,
verifycapstr=None)
ur.set_uri(uri)
self._status.set_status("Finished")
self._status.set_progress(1, 1.0)
self._status.set_progress(2, 1.0)
self._status.set_results(ur)
if self._progress:
self._progress.set_progress(self._size)
return ur
def close(self):
pass
def get_upload_status(self):
return self._status
@implementer(RIEncryptedUploadable)
class RemoteEncryptedUploadable(Referenceable):
def __init__(self, encrypted_uploadable, upload_status):
self._eu = IEncryptedUploadable(encrypted_uploadable)
self._offset = 0
self._bytes_sent = 0
self._status = IUploadStatus(upload_status)
# we are responsible for updating the status string while we run, and
# for setting the ciphertext-fetch progress.
self._size = None
def get_size(self):
if self._size is not None:
return defer.succeed(self._size)
d = self._eu.get_size()
def _got_size(size):
self._size = size
return size
d.addCallback(_got_size)
return d
def remote_get_size(self):
return self.get_size()
def remote_get_all_encoding_parameters(self):
return self._eu.get_all_encoding_parameters()
def _read_encrypted(self, length, hash_only):
d = self._eu.read_encrypted(length, hash_only)
def _read(strings):
if hash_only:
self._offset += length
else:
size = sum([len(data) for data in strings])
self._offset += size
return strings
d.addCallback(_read)
return d
def remote_read_encrypted(self, offset, length):
# we don't support seek backwards, but we allow skipping forwards
precondition(offset >= 0, offset)
precondition(length >= 0, length)
lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
level=log.NOISY)
precondition(offset >= self._offset, offset, self._offset)
if offset > self._offset:
# read the data from disk anyways, to build up the hash tree
skip = offset - self._offset
log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
(self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
d = self._read_encrypted(skip, hash_only=True)
else:
d = defer.succeed(None)
def _at_correct_offset(res):
assert offset == self._offset, "%d != %d" % (offset, self._offset)
return self._read_encrypted(length, hash_only=False)
d.addCallback(_at_correct_offset)
def _read(strings):
size = sum([len(data) for data in strings])
self._bytes_sent += size
return strings
d.addCallback(_read)
return d
def remote_close(self):
return self._eu.close()
2019-05-15 06:17:44 +00:00
class AssistedUploader(object):
def __init__(self, helper, storage_broker):
self._helper = helper
self._storage_broker = storage_broker
self._log_number = log.msg("AssistedUploader starting")
self._storage_index = None
self._upload_status = s = UploadStatus()
s.set_helper(True)
s.set_active(True)
2008-03-01 03:00:45 +00:00
def log(self, *args, **kwargs):
if "parent" not in kwargs:
kwargs["parent"] = self._log_number
return log.msg(*args, **kwargs)
def start(self, encrypted_uploadable, storage_index):
"""Start uploading the file.
Returns a Deferred that will fire with the UploadResults instance.
"""
precondition(isinstance(storage_index, str), storage_index)
self._started = time.time()
eu = IEncryptedUploadable(encrypted_uploadable)
eu.set_upload_status(self._upload_status)
self._encuploadable = eu
self._storage_index = storage_index
d = eu.get_size()
d.addCallback(self._got_size)
d.addCallback(lambda res: eu.get_all_encoding_parameters())
d.addCallback(self._got_all_encoding_parameters)
d.addCallback(self._contact_helper)
d.addCallback(self._build_verifycap)
def _done(res):
self._upload_status.set_active(False)
return res
d.addBoth(_done)
return d
def _got_size(self, size):
self._size = size
self._upload_status.set_size(size)
def _got_all_encoding_parameters(self, params):
k, happy, n, segment_size = params
# stash these for URI generation later
self._needed_shares = k
self._total_shares = n
self._segment_size = segment_size
def _contact_helper(self, res):
now = self._time_contacting_helper_start = time.time()
self._storage_index_elapsed = now - self._started
2008-03-01 03:00:45 +00:00
self.log(format="contacting helper for SI %(si)s..",
si=si_b2a(self._storage_index), level=log.NOISY)
self._upload_status.set_status("Contacting Helper")
d = self._helper.callRemote("upload_chk", self._storage_index)
d.addCallback(self._contacted_helper)
return d
def _contacted_helper(self, helper_upload_results_and_upload_helper):
(helper_upload_results, upload_helper) = helper_upload_results_and_upload_helper
now = time.time()
elapsed = now - self._time_contacting_helper_start
self._elapsed_time_contacting_helper = elapsed
if upload_helper:
self.log("helper says we need to upload", level=log.NOISY)
self._upload_status.set_status("Uploading Ciphertext")
# we need to upload the file
reu = RemoteEncryptedUploadable(self._encuploadable,
self._upload_status)
# let it pre-compute the size for progress purposes
d = reu.get_size()
d.addCallback(lambda ignored:
upload_helper.callRemote("upload", reu))
# this Deferred will fire with the upload results
return d
self.log("helper says file is already uploaded", level=log.OPERATIONAL)
self._upload_status.set_progress(1, 1.0)
return helper_upload_results
def _convert_old_upload_results(self, upload_results):
# pre-1.3.0 helpers return upload results which contain a mapping
# from shnum to a single human-readable string, containing things
# like "Found on [x],[y],[z]" (for healthy files that were already in
# the grid), "Found on [x]" (for files that needed upload but which
# discovered pre-existing shares), and "Placed on [x]" (for newly
# uploaded shares). The 1.3.0 helper returns a mapping from shnum to
# set of binary serverid strings.
# the old results are too hard to deal with (they don't even contain
# as much information as the new results, since the nodeids are
# abbreviated), so if we detect old results, just clobber them.
sharemap = upload_results.sharemap
if str in [type(v) for v in sharemap.values()]:
upload_results.sharemap = None
def _build_verifycap(self, helper_upload_results):
self.log("upload finished, building readcap", level=log.OPERATIONAL)
self._convert_old_upload_results(helper_upload_results)
self._upload_status.set_status("Building Readcap")
hur = helper_upload_results
assert hur.uri_extension_data["needed_shares"] == self._needed_shares
assert hur.uri_extension_data["total_shares"] == self._total_shares
assert hur.uri_extension_data["segment_size"] == self._segment_size
assert hur.uri_extension_data["size"] == self._size
# hur.verifycap doesn't exist if already found
v = uri.CHKFileVerifierURI(self._storage_index,
uri_extension_hash=hur.uri_extension_hash,
needed_shares=self._needed_shares,
total_shares=self._total_shares,
size=self._size)
timings = {}
timings["storage_index"] = self._storage_index_elapsed
timings["contacting_helper"] = self._elapsed_time_contacting_helper
for key,val in hur.timings.items():
if key == "total":
key = "helper_total"
timings[key] = val
now = time.time()
timings["total"] = now - self._started
make IServer.get_serverid() use pubkey, not tubid This is a change I've wanted to make for many years, because when we get to HTTP-based servers, we won't have tubids for them. What held me back was that there's code all over the place that uses the serverid for various purposes, so I wasn't sure it was safe. I did a big push a few years ago to use IServer instances instead of serverids in most places (in #1363), and to split out the values that actually depend upon tubid into separate accessors (like get_lease_seed and get_foolscap_write_enabler_seed), which I think took care of all the important uses. There are a number of places that use get_serverid() as dictionary key to track shares (Checker results, mutable servermap). I believe these are happy to use pubkeys instead of tubids: the only thing they do with get_serverid() is to compare it to other values obtained from get_serverid(). A few places in the WUI used serverid to compute display values: these were fixed. The main trouble was the Helper: it returns a HelperUploadResults (a Copyable) with a share->server mapping that's keyed by whatever the Helper's get_serverid() returns. If the uploader and the helper are on different sides of this change, the Helper could return values that the uploader won't recognize. This is cosmetic: that mapping is only used to display the upload results on the "Recent and Active Operations" page. I've added code to StorageFarmBroker.get_stub_server() to fall back to tubids when looking up a server, so this should still work correctly when the uploader is new and the Helper is old. If the Helper is new and the uploader is old, the upload results will show unusual server ids. refs ticket:1363
2016-08-26 19:16:17 +00:00
# Note: older Helpers (<=1.11) sent tubids as serverids. Newer ones
# send pubkeys. get_stub_server() knows how to map both into
# IDisplayableServer instances.
gss = self._storage_broker.get_stub_server
sharemap = {}
servermap = {}
for shnum, serverids in hur.sharemap.items():
sharemap[shnum] = set([gss(serverid) for serverid in serverids])
# if the file was already in the grid, hur.servermap is an empty dict
for serverid, shnums in hur.servermap.items():
servermap[gss(serverid)] = set(shnums)
ur = UploadResults(file_size=self._size,
# not if already found
ciphertext_fetched=hur.ciphertext_fetched,
preexisting_shares=hur.preexisting_shares,
pushed_shares=hur.pushed_shares,
sharemap=sharemap,
servermap=servermap,
timings=timings,
uri_extension_data=hur.uri_extension_data,
uri_extension_hash=hur.uri_extension_hash,
verifycapstr=v.to_string())
self._upload_status.set_status("Finished")
self._upload_status.set_results(ur)
return ur
def get_upload_status(self):
return self._upload_status
2019-05-15 06:17:44 +00:00
class BaseUploadable(object):
# this is overridden by max_segment_size
default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
default_params_set = False
max_segment_size = None
encoding_param_k = None
encoding_param_happy = None
encoding_param_n = None
_all_encoding_parameters = None
_status = None
def set_upload_status(self, upload_status):
self._status = IUploadStatus(upload_status)
def set_default_encoding_parameters(self, default_params):
assert isinstance(default_params, dict)
for k,v in default_params.items():
precondition(isinstance(k, str), k, v)
precondition(isinstance(v, int), k, v)
if "k" in default_params:
self.default_encoding_param_k = default_params["k"]
if "happy" in default_params:
self.default_encoding_param_happy = default_params["happy"]
if "n" in default_params:
self.default_encoding_param_n = default_params["n"]
if "max_segment_size" in default_params:
self.default_max_segment_size = default_params["max_segment_size"]
self.default_params_set = True
def get_all_encoding_parameters(self):
_assert(self.default_params_set, "set_default_encoding_parameters not called on %r" % (self,))
if self._all_encoding_parameters:
return defer.succeed(self._all_encoding_parameters)
max_segsize = self.max_segment_size or self.default_max_segment_size
k = self.encoding_param_k or self.default_encoding_param_k
happy = self.encoding_param_happy or self.default_encoding_param_happy
n = self.encoding_param_n or self.default_encoding_param_n
d = self.get_size()
def _got_size(file_size):
# for small files, shrink the segment size to avoid wasting space
segsize = min(max_segsize, file_size)
# this must be a multiple of 'required_shares'==k
segsize = mathutil.next_multiple(segsize, k)
encoding_parameters = (k, happy, n, segsize)
self._all_encoding_parameters = encoding_parameters
return encoding_parameters
d.addCallback(_got_size)
return d
@implementer(IUploadable)
class FileHandle(BaseUploadable):
def __init__(self, filehandle, convergence):
"""
Upload the data from the filehandle. If convergence is None then a
random encryption key will be used, else the plaintext will be hashed,
then the hash will be hashed together with the string in the
"convergence" argument to form the encryption key.
"""
assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
self._filehandle = filehandle
self._key = None
self.convergence = convergence
self._size = None
def _get_encryption_key_convergent(self):
if self._key is not None:
return defer.succeed(self._key)
d = self.get_size()
# that sets self._size as a side-effect
d.addCallback(lambda size: self.get_all_encoding_parameters())
def _got(params):
k, happy, n, segsize = params
f = self._filehandle
enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
f.seek(0)
BLOCKSIZE = 64*1024
bytes_read = 0
while True:
data = f.read(BLOCKSIZE)
if not data:
break
enckey_hasher.update(data)
# TODO: setting progress in a non-yielding loop is kind of
# pointless, but I'm anticipating (perhaps prematurely) the
# day when we use a slowjob or twisted's CooperatorService to
# make this yield time to other jobs.
bytes_read += len(data)
if self._status:
self._status.set_progress(0, float(bytes_read)/self._size)
f.seek(0)
self._key = enckey_hasher.digest()
if self._status:
self._status.set_progress(0, 1.0)
assert len(self._key) == 16
return self._key
d.addCallback(_got)
return d
def _get_encryption_key_random(self):
if self._key is None:
self._key = os.urandom(16)
return defer.succeed(self._key)
def get_encryption_key(self):
if self.convergence is not None:
return self._get_encryption_key_convergent()
else:
return self._get_encryption_key_random()
def get_size(self):
if self._size is not None:
return defer.succeed(self._size)
self._filehandle.seek(0, os.SEEK_END)
size = self._filehandle.tell()
self._size = size
self._filehandle.seek(0)
return defer.succeed(size)
def read(self, length):
return defer.succeed([self._filehandle.read(length)])
def close(self):
# the originator of the filehandle reserves the right to close it
pass
class FileName(FileHandle):
def __init__(self, filename, convergence):
"""
Upload the data from the filename. If convergence is None then a
random encryption key will be used, else the plaintext will be hashed,
then the hash will be hashed together with the string in the
"convergence" argument to form the encryption key.
"""
assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
def close(self):
FileHandle.close(self)
self._filehandle.close()
class Data(FileHandle):
def __init__(self, data, convergence):
"""
Upload the data from the data argument. If convergence is None then a
random encryption key will be used, else the plaintext will be hashed,
then the hash will be hashed together with the string in the
"convergence" argument to form the encryption key.
"""
assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
FileHandle.__init__(self, StringIO(data), convergence=convergence)
@implementer(IUploader)
class Uploader(service.MultiService, log.PrefixingLogMixin):
2008-03-01 03:00:45 +00:00
"""I am a service that allows file uploading. I am a service-child of the
Client.
"""
name = "uploader"
URI_LIT_SIZE_THRESHOLD = 55
def __init__(self, helper_furl=None, stats_provider=None, history=None, progress=None):
self._helper_furl = helper_furl
self.stats_provider = stats_provider
self._history = history
self._helper = None
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
self._progress = progress
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
service.MultiService.__init__(self)
def startService(self):
service.MultiService.startService(self)
if self._helper_furl:
self.parent.tub.connectTo(self._helper_furl,
self._got_helper)
def _got_helper(self, helper):
self.log("got helper connection, getting versions")
default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
{ },
"application-version": "unknown: no get_version()",
}
d = add_version_to_remote_reference(helper, default)
d.addCallback(self._got_versioned_helper)
def _got_versioned_helper(self, helper):
needed = "http://allmydata.org/tahoe/protocols/helper/v1"
if needed not in helper.version:
raise InsufficientVersionError(needed, helper.version)
self._helper = helper
helper.notifyOnDisconnect(self._lost_helper)
def _lost_helper(self):
self._helper = None
def get_helper_info(self):
# return a tuple of (helper_furl_or_None, connected_bool)
return (self._helper_furl, bool(self._helper))
def upload(self, uploadable, progress=None, reactor=None):
"""
Returns a Deferred that will fire with the UploadResults instance.
"""
assert self.parent
assert self.running
assert progress is None or IProgress.providedBy(progress)
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
def _got_size(size):
default_params = self.parent.get_encoding_parameters()
precondition(isinstance(default_params, dict), default_params)
precondition("max_segment_size" in default_params, default_params)
uploadable.set_default_encoding_parameters(default_params)
if progress:
progress.set_progress_total(size)
if self.stats_provider:
self.stats_provider.count('uploader.files_uploaded', 1)
self.stats_provider.count('uploader.bytes_uploaded', size)
if size <= self.URI_LIT_SIZE_THRESHOLD:
uploader = LiteralUploader(progress=progress)
return uploader.start(uploadable)
else:
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
d2 = defer.succeed(None)
storage_broker = self.parent.get_storage_broker()
if self._helper:
uploader = AssistedUploader(self._helper, storage_broker)
d2.addCallback(lambda x: eu.get_storage_index())
d2.addCallback(lambda si: uploader.start(eu, si))
else:
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally. * stop using IURI as an adapter * pass cap strings around instead of URI instances * move filenode/dirnode creation duties from Client to new NodeMaker class * move other Client duties to KeyGenerator, SecretHolder, History classes * stop passing Client reference to dirnode/filenode constructors - pass less-powerful references instead, like StorageBroker or Uploader * always create DirectoryNodes by wrapping a filenode (mutable for now) * remove some specialized mock classes from unit tests Detailed list of changes (done one at a time, then merged together) always pass a string to create_node_from_uri(), not an IURI instance always pass a string to IFilesystemNode constructors, not an IURI instance stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri() client.py: move SecretHolder code out to a separate class test_web.py: hush pyflakes client.py: move NodeMaker functionality out into a separate object LiteralFileNode: stop storing a Client reference immutable Checker: remove Client reference, it only needs a SecretHolder immutable Upload: remove Client reference, leave SecretHolder and StorageBroker immutable Repairer: replace Client reference with StorageBroker and SecretHolder immutable FileNode: remove Client reference mutable.Publish: stop passing Client mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference MutableChecker: reference StorageBroker and History directly, not through Client mutable.FileNode: removed unused indirection to checker classes mutable.FileNode: remove Client reference client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker move create_mutable_file() into NodeMaker test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests. test_mutable.py: clean up basedir names client.py: move create_empty_dirnode() into NodeMaker dirnode.py: get rid of DirectoryNode.create remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker remove Client from NodeMaker move helper status into History, pass History to web.Status instead of Client test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
storage_broker = self.parent.get_storage_broker()
secret_holder = self.parent._secret_holder
uploader = CHKUploader(storage_broker, secret_holder, progress=progress, reactor=reactor)
d2.addCallback(lambda x: uploader.start(eu))
self._all_uploads[uploader] = None
if self._history:
self._history.add_upload(uploader.get_upload_status())
def turn_verifycap_into_read_cap(uploadresults):
# Generate the uri from the verifycap plus the key.
d3 = uploadable.get_encryption_key()
def put_readcap_into_results(key):
v = uri.from_string(uploadresults.get_verifycapstr())
r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
uploadresults.set_uri(r.to_string())
return uploadresults
d3.addCallback(put_readcap_into_results)
return d3
d2.addCallback(turn_verifycap_into_read_cap)
return d2
d.addCallback(_got_size)
def _done(res):
uploadable.close()
return res
d.addBoth(_done)
return d