mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-03-24 04:35:17 +00:00
CHK upload helper: don't let one failed upload prevent us from trying again
This commit is contained in:
parent
96d5455a53
commit
69a0b5cc00
@ -4,6 +4,7 @@ from zope.interface import implements
|
||||
from twisted.application import service
|
||||
from twisted.internet import defer
|
||||
from foolscap import Referenceable
|
||||
from foolscap.eventual import eventually
|
||||
from allmydata import upload, interfaces
|
||||
from allmydata.util import idlib, log, observer, fileutil
|
||||
|
||||
@ -53,11 +54,10 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||
# If not, return (UploadResults,None) .
|
||||
self.log("deciding whether to upload the file or not", level=log.NOISY)
|
||||
if os.path.exists(self._encoding_file):
|
||||
# we have the whole file, and we're currently encoding it. The
|
||||
# caller will get to see the results when we're done. TODO: how
|
||||
# should they get upload progress in this case?
|
||||
self.log("encoding in progress", level=log.UNUSUAL)
|
||||
return self._finished_observers.when_fired()
|
||||
# we have the whole file, and we might be encoding it (or the
|
||||
# encode/upload might have failed, and we need to restart it).
|
||||
self.log("ciphertext already in place", level=log.UNUSUAL)
|
||||
return ({}, self)
|
||||
if os.path.exists(self._incoming_file):
|
||||
# we have some of the file, but not all of it (otherwise we'd be
|
||||
# encoding). The caller might be useful.
|
||||
@ -76,11 +76,6 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||
# reader is an RIEncryptedUploadable. I am specified to return an
|
||||
# UploadResults dictionary.
|
||||
|
||||
if os.path.exists(self._encoding_file):
|
||||
# we've already started encoding, so we have no use for the
|
||||
# reader. Notify them when we're done.
|
||||
return self._finished_observers.when_fired()
|
||||
|
||||
# let our fetcher pull ciphertext from the reader.
|
||||
self._fetcher.add_reader(reader)
|
||||
# and also hashes
|
||||
@ -159,13 +154,18 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
|
||||
|
||||
def add_reader(self, reader):
|
||||
AskUntilSuccessMixin.add_reader(self, reader)
|
||||
self._start()
|
||||
eventually(self._start)
|
||||
|
||||
def _start(self):
|
||||
if self._started:
|
||||
return
|
||||
self._started = True
|
||||
|
||||
if os.path.exists(self._encoding_file):
|
||||
self.log("ciphertext already present, bypassing fetch",
|
||||
level=log.UNUSUAL)
|
||||
return self._done2()
|
||||
|
||||
# first, find out how large the file is going to be
|
||||
d = self.call("get_size")
|
||||
d.addCallback(self._got_size)
|
||||
@ -249,11 +249,14 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
|
||||
def _done(self, res):
|
||||
self._f.close()
|
||||
self._f = None
|
||||
self._readers = []
|
||||
self.log(format="done fetching ciphertext, size=%(size)d",
|
||||
size=os.stat(self._incoming_file)[stat.ST_SIZE],
|
||||
level=log.NOISY)
|
||||
os.rename(self._incoming_file, self._encoding_file)
|
||||
return self._done2()
|
||||
|
||||
def _done2(self):
|
||||
self._readers = []
|
||||
self._done_observers.fire(None)
|
||||
|
||||
def _failed(self, f):
|
||||
|
@ -7,7 +7,8 @@ from foolscap import Tub, eventual
|
||||
from foolscap.logging import log
|
||||
|
||||
from allmydata import upload, offloaded
|
||||
from allmydata.util import hashutil, fileutil
|
||||
from allmydata.util import hashutil, fileutil, idlib
|
||||
from pycryptopp.cipher.aes import AES
|
||||
|
||||
MiB = 1024*1024
|
||||
|
||||
@ -106,6 +107,48 @@ class AssistedUpload(unittest.TestCase):
|
||||
|
||||
return d
|
||||
|
||||
def test_previous_upload_failed(self):
|
||||
self.basedir = "helper/AssistedUpload/test_previous_upload_failed"
|
||||
self.setUpHelper(self.basedir)
|
||||
DATA = "I need help\n" * 1000
|
||||
|
||||
# we want to make sure that an upload which fails (leaving the
|
||||
# ciphertext in the CHK_encoding/ directory) does not prevent a later
|
||||
# attempt to upload that file from working. We simulate this by
|
||||
# populating the directory manually.
|
||||
key = hashutil.key_hash(DATA)[:16]
|
||||
encryptor = AES(key)
|
||||
SI = hashutil.storage_index_chk_hash(key)
|
||||
SI_s = idlib.b2a(SI)
|
||||
encfile = os.path.join(self.basedir, "CHK_encoding", SI_s)
|
||||
f = open(encfile, "wb")
|
||||
f.write(encryptor.process(DATA))
|
||||
f.close()
|
||||
|
||||
u = upload.Uploader(self.helper_furl)
|
||||
u.setServiceParent(self.s)
|
||||
|
||||
# wait a few turns
|
||||
d = eventual.fireEventually()
|
||||
d.addCallback(eventual.fireEventually)
|
||||
d.addCallback(eventual.fireEventually)
|
||||
|
||||
def _ready(res):
|
||||
assert u._helper
|
||||
return u.upload_data(DATA)
|
||||
d.addCallback(_ready)
|
||||
def _uploaded(uri):
|
||||
assert "CHK" in uri
|
||||
d.addCallback(_uploaded)
|
||||
|
||||
def _check_empty(res):
|
||||
files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
|
||||
self.failUnlessEqual(files, [])
|
||||
files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
|
||||
self.failUnlessEqual(files, [])
|
||||
d.addCallback(_check_empty)
|
||||
|
||||
return d
|
||||
|
||||
def test_already_uploaded(self):
|
||||
self.basedir = "helper/AssistedUpload/test_already_uploaded"
|
||||
|
Loading…
x
Reference in New Issue
Block a user