upload-helper: avoid duplicate uploads: check the grid to see if the file already exists

This commit is contained in:
Brian Warner 2008-01-30 18:49:02 -07:00
parent a1b155725f
commit 81eeafc574
3 changed files with 177 additions and 19 deletions

View File

@ -5,14 +5,125 @@ from twisted.application import service
from twisted.internet import defer
from foolscap import Referenceable
from foolscap.eventual import eventually
from allmydata import upload, interfaces
from allmydata.util import idlib, log, observer, fileutil
from allmydata import upload, interfaces, storage, uri
from allmydata.util import idlib, log, observer, fileutil, hashutil
class NotEnoughWritersError(Exception):
pass
class CHKCheckerAndUEBFetcher:
"""I check to see if a file is already present in the grid. I also fetch
the URI Extension Block, which is useful for an uploading client who
wants to avoid the work of encryption and encoding.
I return False if the file is not completely healthy: i.e. if there are
less than 'N' shares present.
If the file is completely healthy, I return a tuple of (sharemap,
UEB_data, UEB_hash).
"""
def __init__(self, peer_getter, storage_index, logparent=None):
self._peer_getter = peer_getter
self._found_shares = set()
self._storage_index = storage_index
self._sharemap = {}
self._readers = set()
self._ueb_hash = None
self._ueb_data = None
self._logparent = logparent
def log(self, *args, **kwargs):
if 'facility' not in kwargs:
kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
if 'parent' not in kwargs:
kwargs['parent'] = self._logparent
return log.msg(*args, **kwargs)
def check(self):
d = self._get_all_shareholders(self._storage_index)
d.addCallback(self._get_uri_extension)
d.addCallback(self._done)
return d
def _get_all_shareholders(self, storage_index):
dl = []
for (pmpeerid, peerid, connection) in self._peer_getter(storage_index):
d = connection.callRemote("get_service", "storageserver")
d.addCallback(lambda ss: ss.callRemote("get_buckets",
storage_index))
d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(peerid,))
dl.append(d)
return defer.DeferredList(dl)
def _got_response(self, buckets, peerid):
# buckets is a dict: maps shum to an rref of the server who holds it
shnums_s = ",".join([str(shnum) for shnum in buckets])
self.log("got_response: [%s] has %d shares (%s)" %
(idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
level=log.NOISY)
self._found_shares.update(buckets.keys())
for k in buckets:
if k not in self._sharemap:
self._sharemap[k] = []
self._sharemap[k].append(peerid)
self._readers.update(buckets.values())
def _got_error(self, f):
if f.check(KeyError):
pass
log.err(f, parent=self._logparent)
pass
def _get_uri_extension(self, res):
# assume that we can pull the UEB from any share. If we get an error,
# declare the whole file unavailable.
if not self._readers:
self.log("no readers, so no UEB", level=log.NOISY)
return
b = self._readers.pop()
rbp = storage.ReadBucketProxy(b)
d = rbp.startIfNecessary()
d.addCallback(lambda res: rbp.get_uri_extension())
d.addCallback(self._got_uri_extension)
d.addErrback(self._ueb_error)
return d
def _got_uri_extension(self, ueb):
self.log("_got_uri_extension", level=log.NOISY)
self._ueb_hash = hashutil.uri_extension_hash(ueb)
self._ueb_data = uri.unpack_extension(ueb)
def _ueb_error(self, f):
# an error means the file is unavailable, but the overall check
# shouldn't fail.
self.log("UEB fetch failed", failure=f, level=log.WEIRD)
return None
def _done(self, res):
if self._ueb_data:
found = len(self._found_shares)
total = self._ueb_data['total_shares']
self.log(format="got %(found)d shares of %(total)d",
found=found, total=total, level=log.NOISY)
if found < total:
# not all shares are present in the grid
self.log("not enough to qualify, file not found in grid",
level=log.NOISY)
return False
# all shares are present
self.log("all shares present, file is found in grid",
level=log.NOISY)
return (self._sharemap, self._ueb_data, self._ueb_hash)
# no shares are present
self.log("unable to find UEB data, file not found in grid",
level=log.NOISY)
return False
class CHKUploadHelper(Referenceable, upload.CHKUploader):
"""I am the helper-server -side counterpart to AssistedUploader. I handle
peer selection, encoding, and share pushing. I read ciphertext from the
@ -63,13 +174,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
# encoding). The caller might be useful.
self.log("partial ciphertext already present", level=log.UNUSUAL)
return ({}, self)
# we don't remember uploading this file, but it might already be in
# the grid. For now we do an unconditional upload. TODO: Do a quick
# checker run (send one query to each storage server) to see who has
# the file. Then accomodate a lazy uploader by retrieving the UEB
# from one of the shares and hash it.
#return ({'uri_extension_hash': hashutil.uri_extension_hash("")},self)
self.log("no record of having uploaded the file", level=log.NOISY)
# we don't remember uploading this file
self.log("no ciphertext yet", level=log.NOISY)
return ({}, self)
def remote_upload(self, reader):
@ -352,13 +458,47 @@ class Helper(Referenceable, service.MultiService):
if storage_index in self._active_uploads:
self.log("upload is currently active", parent=lp)
uh = self._active_uploads[storage_index]
else:
self.log("creating new upload helper", parent=lp)
uh = self.chk_upload_helper_class(storage_index, self,
incoming_file, encoding_file,
lp)
self._active_uploads[storage_index] = uh
return uh.start()
return uh.start()
d = self._check_for_chk_already_in_grid(storage_index, lp)
def _checked(upload_results):
if upload_results:
return (upload_results, None)
# the file is not present in the grid, by which we mean there are
# less than 'N' shares available.
self.log("unable to find file in the grid", level=log.NOISY)
# We need an upload helper. Check our active uploads again in
# case there was a race.
if storage_index in self._active_uploads:
self.log("upload is currently active", parent=lp)
uh = self._active_uploads[storage_index]
else:
self.log("creating new upload helper", parent=lp)
uh = self.chk_upload_helper_class(storage_index, self,
incoming_file, encoding_file,
lp)
self._active_uploads[storage_index] = uh
return uh.start()
d.addCallback(_checked)
return d
def _check_for_chk_already_in_grid(self, storage_index, lp):
# see if this file is already in the grid
lp2 = self.log("doing a quick check+UEBfetch",
parent=lp, level=log.NOISY)
c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
storage_index, lp2)
d = c.check()
def _checked(res):
if res:
(sharemap, ueb_data, ueb_hash) = res
self.log("found file in grid", level=log.NOISY, parent=lp)
upload_results = {'uri_extension_hash': ueb_hash}
return upload_results
return False
d.addCallback(_checked)
return d
def upload_finished(self, storage_index):
del self._active_uploads[storage_index]

View File

@ -43,6 +43,8 @@ class FakeClient(service.MultiService):
return True
def get_encoding_parameters(self):
return self.DEFAULT_ENCODING_PARAMETERS
def get_permuted_peers(self, storage_index):
return []
def flush_but_dont_ignore(res):
d = eventual.flushEventualQueue()

View File

@ -289,19 +289,35 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
extra_node.getServiceNamed("storageserver").sizelimit = 0
d.addCallback(_added)
HELPER_DATA = "Data that needs help to upload" * 1000
def _upload_with_helper(res):
DATA = "Data that needs help to upload" * 1000
u = upload.Data(DATA, contenthashkey=contenthashkey)
u = upload.Data(HELPER_DATA, contenthashkey=contenthashkey)
d = self.extra_node.upload(u)
def _uploaded(uri):
return self.downloader.download_to_data(uri)
d.addCallback(_uploaded)
def _check(newdata):
self.failUnlessEqual(newdata, DATA)
self.failUnlessEqual(newdata, HELPER_DATA)
d.addCallback(_check)
return d
d.addCallback(_upload_with_helper)
def _upload_duplicate_with_helper(res):
u = upload.Data(HELPER_DATA, contenthashkey=contenthashkey)
u.debug_stash_RemoteEncryptedUploadable = True
d = self.extra_node.upload(u)
def _uploaded(uri):
return self.downloader.download_to_data(uri)
d.addCallback(_uploaded)
def _check(newdata):
self.failUnlessEqual(newdata, HELPER_DATA)
self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
"uploadable started uploading, should have been avoided")
d.addCallback(_check)
return d
if contenthashkey:
d.addCallback(_upload_duplicate_with_helper)
def _upload_resumable(res):
DATA = "Data that needs help to upload and gets interrupted" * 1000
u1 = upload.Data(DATA, contenthashkey=contenthashkey)