diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py index a9a4aa4e6..40c98a83d 100644 --- a/src/allmydata/immutable/filenode.py +++ b/src/allmydata/immutable/filenode.py @@ -5,8 +5,10 @@ from zope.interface import implements from twisted.internet import defer from twisted.internet.interfaces import IPushProducer, IConsumer from twisted.protocols import basic -from allmydata.interfaces import IFileNode, IFileURI, ICheckable -from allmydata.util import observer, log, base32 +from foolscap.eventual import eventually +from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \ + IDownloadTarget +from allmydata.util import log, base32 from allmydata.immutable.checker import SimpleCHKFileChecker, \ SimpleCHKFileVerifier from allmydata.immutable import download @@ -60,21 +62,116 @@ class PortionOfFile: self.bytes_left -= len(data) return data -class FileNode(_ImmutableFileNodeBase): - checker_class = SimpleCHKFileChecker - verifier_class = SimpleCHKFileVerifier +class CacheFile: + implements(IDownloadTarget) - def __init__(self, uri, client, cachefile): - _ImmutableFileNodeBase.__init__(self, uri, client) - self.cachefile = cachefile + def __init__(self, node, filename): + self.node = node + self.milestones = set() # of (offset,size,Deferred) + self.cachefilename = filename + self.download_in_progress = False # five states: # new FileNode, no downloads ever performed # new FileNode, leftover file (partial) # new FileNode, leftover file (whole) # download in progress, not yet complete # download complete + + def when_range_available(self, offset, size): + assert isinstance(offset, (int,long)) + assert isinstance(size, (int,long)) + + d = defer.Deferred() + self.milestones.add( (offset,size,d) ) + self._check_milestones() + if self.milestones and not self.download_in_progress: + self.download_in_progress = True + log.msg(format=("immutable filenode read [%(si)s]: " + + "starting download"), + si=base32.b2a(self.node.u.storage_index), + umid="h26Heg", level=log.OPERATIONAL) + downloader = self.node._client.getServiceNamed("downloader") + d2 = downloader.download(self.node.get_uri(), self) + d2.addBoth(self._download_done) + d2.addErrback(self._download_failed) + d2.addErrback(log.err, umid="cQaM9g") + return d + + def read(self, consumer, offset, size): + assert offset+size <= self.get_filesize() + f = PortionOfFile(self.cachefilename, offset, size) + d = basic.FileSender().beginFileTransfer(f, consumer) + d.addCallback(lambda lastSent: consumer) + return d + + def _download_done(self, res): + # clear download_in_progress, so failed downloads can be re-tried self.download_in_progress = False - self.fully_cached_observer = observer.OneShotObserverList() + return res + + def _download_failed(self, f): + # tell anyone who's waiting that we failed + for m in self.milestones: + (offset,size,d) = m + eventually(d.errback, f) + self.milestones.clear() + + def _check_milestones(self): + current_size = self.get_filesize() + for m in list(self.milestones): + (offset,size,d) = m + if offset+size <= current_size: + log.msg(format=("immutable filenode read [%(si)s] " + + "%(offset)d+%(size)d vs %(filesize)d: " + + "done"), + si=base32.b2a(self.node.u.storage_index), + offset=offset, size=size, filesize=current_size, + umid="nuedUg", level=log.NOISY) + self.milestones.discard(m) + eventually(d.callback, None) + else: + log.msg(format=("immutable filenode read [%(si)s] " + + "%(offset)d+%(size)d vs %(filesize)d: " + + "still waiting"), + si=base32.b2a(self.node.u.storage_index), + offset=offset, size=size, filesize=current_size, + umid="8PKOhg", level=log.NOISY) + + def get_filesize(self): + try: + filesize = os.stat(self.cachefilename)[stat.ST_SIZE] + except OSError: + filesize = 0 + return filesize + + + def open(self, size): + self.f = open(self.cachefilename, "wb") + + def write(self, data): + self.f.write(data) + self._check_milestones() + + def close(self): + self.f.close() + self._check_milestones() + + def fail(self, why): + pass + def register_canceller(self, cb): + pass + def finish(self): + return None + + + +class FileNode(_ImmutableFileNodeBase): + checker_class = SimpleCHKFileChecker + verifier_class = SimpleCHKFileVerifier + + def __init__(self, uri, client, cachefile): + _ImmutableFileNodeBase.__init__(self, uri, client) + self.cachefile = CacheFile(self, cachefile) def get_uri(self): return self.u.to_string() @@ -121,23 +218,6 @@ class FileNode(_ImmutableFileNodeBase): if size is None: size = self.get_size() - offset - assert self.cachefile - - try: - filesize = os.stat(self.cachefile)[stat.ST_SIZE] - except OSError: - filesize = 0 - if filesize >= offset+size: - log.msg(format=("immutable filenode read [%(si)s]: " + - "satisfied from cache " + - "(read %(start)d+%(size)d, filesize %(filesize)d)"), - si=base32.b2a(self.u.storage_index), - start=offset, size=size, filesize=filesize, - umid="5p5ECA", level=log.OPERATIONAL) - f = PortionOfFile(self.cachefile, offset, size) - d = basic.FileSender().beginFileTransfer(f, consumer) - d.addCallback(lambda lastSent: consumer) - return d if offset == 0 and size == self.get_size(): # don't use the cache, just do a normal streaming download @@ -147,37 +227,10 @@ class FileNode(_ImmutableFileNodeBase): umid="VRSBwg", level=log.OPERATIONAL) return self.download(download.ConsumerAdapter(consumer)) - if not self.download_in_progress: - log.msg(format=("immutable filenode read [%(si)s]: " + - "starting download"), - si=base32.b2a(self.u.storage_index), - umid="h26Heg", level=log.OPERATIONAL) - self.start_download_to_cache() - - # The file is being downloaded, but the portion we want isn't yet - # available, so we have to wait. First cut: wait for the whole thing - # to download. The second cut will be to wait for a specific range - # milestone, with a download target that counts bytes and compares - # them against a milestone list. - log.msg(format=("immutable filenode read [%(si)s]: " + - "waiting for download"), - si=base32.b2a(self.u.storage_index), - umid="l48V7Q", level=log.OPERATIONAL) - d = self.when_fully_cached() - d.addCallback(lambda ignored: self.read(consumer, offset, size)) + d = self.cachefile.when_range_available(offset, size) + d.addCallback(lambda res: self.cachefile.read(consumer, offset, size)) return d - def start_download_to_cache(self): - assert not self.download_in_progress - self.download_in_progress = True - downloader = self._client.getServiceNamed("downloader") - d = downloader.download_to_filename(self.get_uri(), self.cachefile) - d.addBoth(self.fully_cached_observer.fire) - - def when_fully_cached(self): - return self.fully_cached_observer.when_fired() - - def download(self, target): downloader = self._client.getServiceNamed("downloader") return downloader.download(self.get_uri(), target) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 70c03183c..5a2fff158 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -202,9 +202,22 @@ class SystemTest(SystemTestMixin, unittest.TestCase): def _read_tail_done(mc): self.failUnlessEqual("".join(mc.chunks), DATA[2:]) d.addCallback(_read_tail_done) + return d d.addCallback(_test_read) + def _test_bad_read(res): + bad_u = uri.from_string_filenode(self.uri) + bad_u.key = self.flip_bit(bad_u.key) + bad_n = self.clients[1].create_node_from_uri(bad_u.to_string()) + # this should cause an error during download + + d = self.shouldFail2(NotEnoughSharesError, "'download bad node'", + None, + bad_n.read, MemoryConsumer(), offset=2) + return d + d.addCallback(_test_bad_read) + def _download_nonexistent_uri(res): baduri = self.mangle_uri(self.uri) log.msg("about to download non-existent URI", level=log.UNUSUAL,