#527: respond to GETs with early ranges quickly, without waiting for the whole file to download. Fixes the alacrity problems with the earlier code. Still needs cache expiration.

This commit is contained in:
Brian Warner 2008-10-28 17:56:18 -07:00
parent 37e3d8e47c
commit b1ca238176
2 changed files with 121 additions and 55 deletions

View File

@ -5,8 +5,10 @@ from zope.interface import implements
from twisted.internet import defer
from twisted.internet.interfaces import IPushProducer, IConsumer
from twisted.protocols import basic
from allmydata.interfaces import IFileNode, IFileURI, ICheckable
from allmydata.util import observer, log, base32
from foolscap.eventual import eventually
from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
IDownloadTarget
from allmydata.util import log, base32
from allmydata.immutable.checker import SimpleCHKFileChecker, \
SimpleCHKFileVerifier
from allmydata.immutable import download
@ -60,21 +62,116 @@ class PortionOfFile:
self.bytes_left -= len(data)
return data
class FileNode(_ImmutableFileNodeBase):
checker_class = SimpleCHKFileChecker
verifier_class = SimpleCHKFileVerifier
class CacheFile:
implements(IDownloadTarget)
def __init__(self, uri, client, cachefile):
_ImmutableFileNodeBase.__init__(self, uri, client)
self.cachefile = cachefile
def __init__(self, node, filename):
self.node = node
self.milestones = set() # of (offset,size,Deferred)
self.cachefilename = filename
self.download_in_progress = False
# five states:
# new FileNode, no downloads ever performed
# new FileNode, leftover file (partial)
# new FileNode, leftover file (whole)
# download in progress, not yet complete
# download complete
def when_range_available(self, offset, size):
assert isinstance(offset, (int,long))
assert isinstance(size, (int,long))
d = defer.Deferred()
self.milestones.add( (offset,size,d) )
self._check_milestones()
if self.milestones and not self.download_in_progress:
self.download_in_progress = True
log.msg(format=("immutable filenode read [%(si)s]: " +
"starting download"),
si=base32.b2a(self.node.u.storage_index),
umid="h26Heg", level=log.OPERATIONAL)
downloader = self.node._client.getServiceNamed("downloader")
d2 = downloader.download(self.node.get_uri(), self)
d2.addBoth(self._download_done)
d2.addErrback(self._download_failed)
d2.addErrback(log.err, umid="cQaM9g")
return d
def read(self, consumer, offset, size):
assert offset+size <= self.get_filesize()
f = PortionOfFile(self.cachefilename, offset, size)
d = basic.FileSender().beginFileTransfer(f, consumer)
d.addCallback(lambda lastSent: consumer)
return d
def _download_done(self, res):
# clear download_in_progress, so failed downloads can be re-tried
self.download_in_progress = False
self.fully_cached_observer = observer.OneShotObserverList()
return res
def _download_failed(self, f):
# tell anyone who's waiting that we failed
for m in self.milestones:
(offset,size,d) = m
eventually(d.errback, f)
self.milestones.clear()
def _check_milestones(self):
current_size = self.get_filesize()
for m in list(self.milestones):
(offset,size,d) = m
if offset+size <= current_size:
log.msg(format=("immutable filenode read [%(si)s] " +
"%(offset)d+%(size)d vs %(filesize)d: " +
"done"),
si=base32.b2a(self.node.u.storage_index),
offset=offset, size=size, filesize=current_size,
umid="nuedUg", level=log.NOISY)
self.milestones.discard(m)
eventually(d.callback, None)
else:
log.msg(format=("immutable filenode read [%(si)s] " +
"%(offset)d+%(size)d vs %(filesize)d: " +
"still waiting"),
si=base32.b2a(self.node.u.storage_index),
offset=offset, size=size, filesize=current_size,
umid="8PKOhg", level=log.NOISY)
def get_filesize(self):
try:
filesize = os.stat(self.cachefilename)[stat.ST_SIZE]
except OSError:
filesize = 0
return filesize
def open(self, size):
self.f = open(self.cachefilename, "wb")
def write(self, data):
self.f.write(data)
self._check_milestones()
def close(self):
self.f.close()
self._check_milestones()
def fail(self, why):
pass
def register_canceller(self, cb):
pass
def finish(self):
return None
class FileNode(_ImmutableFileNodeBase):
checker_class = SimpleCHKFileChecker
verifier_class = SimpleCHKFileVerifier
def __init__(self, uri, client, cachefile):
_ImmutableFileNodeBase.__init__(self, uri, client)
self.cachefile = CacheFile(self, cachefile)
def get_uri(self):
return self.u.to_string()
@ -121,23 +218,6 @@ class FileNode(_ImmutableFileNodeBase):
if size is None:
size = self.get_size() - offset
assert self.cachefile
try:
filesize = os.stat(self.cachefile)[stat.ST_SIZE]
except OSError:
filesize = 0
if filesize >= offset+size:
log.msg(format=("immutable filenode read [%(si)s]: " +
"satisfied from cache " +
"(read %(start)d+%(size)d, filesize %(filesize)d)"),
si=base32.b2a(self.u.storage_index),
start=offset, size=size, filesize=filesize,
umid="5p5ECA", level=log.OPERATIONAL)
f = PortionOfFile(self.cachefile, offset, size)
d = basic.FileSender().beginFileTransfer(f, consumer)
d.addCallback(lambda lastSent: consumer)
return d
if offset == 0 and size == self.get_size():
# don't use the cache, just do a normal streaming download
@ -147,37 +227,10 @@ class FileNode(_ImmutableFileNodeBase):
umid="VRSBwg", level=log.OPERATIONAL)
return self.download(download.ConsumerAdapter(consumer))
if not self.download_in_progress:
log.msg(format=("immutable filenode read [%(si)s]: " +
"starting download"),
si=base32.b2a(self.u.storage_index),
umid="h26Heg", level=log.OPERATIONAL)
self.start_download_to_cache()
# The file is being downloaded, but the portion we want isn't yet
# available, so we have to wait. First cut: wait for the whole thing
# to download. The second cut will be to wait for a specific range
# milestone, with a download target that counts bytes and compares
# them against a milestone list.
log.msg(format=("immutable filenode read [%(si)s]: " +
"waiting for download"),
si=base32.b2a(self.u.storage_index),
umid="l48V7Q", level=log.OPERATIONAL)
d = self.when_fully_cached()
d.addCallback(lambda ignored: self.read(consumer, offset, size))
d = self.cachefile.when_range_available(offset, size)
d.addCallback(lambda res: self.cachefile.read(consumer, offset, size))
return d
def start_download_to_cache(self):
assert not self.download_in_progress
self.download_in_progress = True
downloader = self._client.getServiceNamed("downloader")
d = downloader.download_to_filename(self.get_uri(), self.cachefile)
d.addBoth(self.fully_cached_observer.fire)
def when_fully_cached(self):
return self.fully_cached_observer.when_fired()
def download(self, target):
downloader = self._client.getServiceNamed("downloader")
return downloader.download(self.get_uri(), target)

View File

@ -202,9 +202,22 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
def _read_tail_done(mc):
self.failUnlessEqual("".join(mc.chunks), DATA[2:])
d.addCallback(_read_tail_done)
return d
d.addCallback(_test_read)
def _test_bad_read(res):
bad_u = uri.from_string_filenode(self.uri)
bad_u.key = self.flip_bit(bad_u.key)
bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
# this should cause an error during download
d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
None,
bad_n.read, MemoryConsumer(), offset=2)
return d
d.addCallback(_test_bad_read)
def _download_nonexistent_uri(res):
baduri = self.mangle_uri(self.uri)
log.msg("about to download non-existent URI", level=log.UNUSUAL,