fuse/blackmatch: added asynchronous (background) file download

previously, upon opening a file for reading, the open() call would block
while the entire file was retrieved from tahoe into the cache directory.
This change adds a DownloaderWithReadQueue class, and associated plumbing,
such that an open() will return promptly with the download initiated 'in
the background'.  Subsequent read() operations will block until enough
data has been downloaded to satisfy that request.  This provides a behaviour
similar to streaming, i.e. the client application will be able to read
data from the fuse interface while the remainder of the file is still being
downloaded.
This commit is contained in:
robk-tahoe 2008-10-20 16:33:33 -07:00
parent 1cdfecb446
commit f08d181764

View File

@ -5,21 +5,25 @@ from allmydata.uri import CHKFileURI, NewDirectoryURI, LiteralFileURI
from allmydata.scripts.common_http import do_http as do_http_req
from allmydata.util.hashutil import tagged_hash
from allmydata.util.assertutil import precondition
from allmydata.util import base32, fileutil
from allmydata.util import base32, fileutil, observer
from allmydata.scripts.common import get_aliases
from twisted.python import usage
from twisted.python.failure import Failure
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor, defer
from twisted.internet import reactor, defer, task
from twisted.web import client
import base64
import errno
import heapq
import sha
import socket
import stat
import subprocess
import sys
import os
import weakref
#import pprint
# one needs either python-fuse to have been installed in sys.path, or
@ -150,6 +154,10 @@ class EEXIST(TFSIOError):
def __init__(self, msg):
TFSIOError.__init__(self, errno.EEXIST, msg)
class EIO(TFSIOError):
def __init__(self, msg):
TFSIOError.__init__(self, errno.EIO, msg)
def logargsretexc(meth):
def inner_logargsretexc(self, *args, **kwargs):
log("%s(%r, %r)" % (meth, args, kwargs))
@ -206,12 +214,116 @@ def repr_flags(flags=None):
ret = ['O_RDONLY']
return '|'.join(ret)
class DownloaderWithReadQueue(object):
def __init__(self):
self.read_heap = []
self.dest_file_name = None
self.running = False
self.done_observer = observer.OneShotObserverList()
def __repr__(self):
name = self.dest_file_name is None and '<none>' or os.path.basename(self.dest_file_name)
return "<DWRQ(%s)> q(%s)" % (name, len(self.read_heap or []))
def log(self, msg):
log("%r: %s" % (self, msg))
@logexc
def start(self, url, dest_file_name, target_size, interval=0.5):
self.log('start(%s, %s, %s)' % (url, dest_file_name, target_size, ))
self.dest_file_name = dest_file_name
file(self.dest_file_name, 'wb').close() # touch
self.target_size = target_size
self.log('start()')
self.loop = task.LoopingCall(self._check_file_size)
self.loop.start(interval)
self.running = True
d = client.downloadPage(url, self.dest_file_name)
d.addCallbacks(self.done, self.fail)
return d
def when_done(self):
return self.done_observer.when_fired()
def get_size(self):
if os.path.exists(self.dest_file_name):
return os.path.getsize(self.dest_file_name)
else:
return 0
@logexc
def _read(self, posn, size):
#self.log('_read(%s, %s)' % (posn, size))
f = file(self.dest_file_name, 'rb')
f.seek(posn)
data = f.read(size)
f.close()
return data
@logexc
def read(self, posn, size):
self.log('read(%s, %s)' % (posn, size))
if self.read_heap is None:
raise ValueError('read() called when already shut down')
if posn+size > self.target_size:
size -= self.target_size - posn
fsize = self.get_size()
if posn+size < fsize:
return defer.succeed(self._read(posn, size))
else:
d = defer.Deferred()
dread = (posn+size, posn, d)
heapq.heappush(self.read_heap, dread)
return d
@logexc
def _check_file_size(self):
#self.log('_check_file_size()')
if self.read_heap:
try:
size = self.get_size()
while self.read_heap and self.read_heap[0][0] <= size:
end, start, d = heapq.heappop(self.read_heap)
data = self._read(start, end-start)
d.callback(data)
except Exception, e:
log_exc()
failure = Failure()
@logexc
def fail(self, failure):
self.log('fail(%s)' % (failure,))
self.running = False
if self.loop.running:
self.loop.stop()
# fail any reads still pending
for end, start, d in self.read_heap:
reactor.callLater(0, d.errback, failure)
self.read_heap = None
self.done_observer.fire_if_not_fired(failure)
return failure
@logexc
def done(self, result):
self.log('done()')
self.running = False
if self.loop.running:
self.loop.stop()
precondition(self.get_size() == self.target_size, self.get_size(), self.target_size)
self._check_file_size() # process anything left pending in heap
precondition(not self.read_heap, self.read_heap, self.target_size, self.get_size())
self.read_heap = None
self.done_observer.fire_if_not_fired(self)
return result
class TahoeFuseFile(object):
#def __init__(self, path, flags, *mode):
def __init__(self, tfs, path, flags, *mode):
log("TFF: __init__(%r, %r:%s, %r:%s)" % (path, flags, repr_flags(flags), mode, repr_mode(*mode)))
self.tfs = tfs
self.downloader = None
self._path = path # for tahoe put
try:
@ -250,8 +362,19 @@ class TahoeFuseFile(object):
self.fname = self.fnode.tmp_fname
log('TFF: reopening(%s) for reading' % self.fname)
else:
log('TFF: fetching file from cache for reading')
self.fname = self.tfs.cache.get_file(uri)
if uri.startswith("URI:LIT") or not self.tfs.async:
log('TFF: synchronously fetching file from cache for reading')
self.fname = self.tfs.cache.get_file(uri)
else:
log('TFF: asynchronously fetching file from cache for reading')
self.fname, self.downloader = self.tfs.cache.async_get_file(uri)
# downloader is None if the cache already contains the file
if self.downloader is not None:
d = self.downloader.when_done()
def download_complete(junk):
# once the download is complete, revert to non-async behaviour
self.downloader = None
d.addCallback(download_complete)
self.file = os.fdopen(os.open(self.fname, flags, *mode), m)
self.fd = self.file.fileno()
@ -267,8 +390,19 @@ class TahoeFuseFile(object):
@logexc
def read(self, size, offset):
self.log('read(%r, %r)' % (size, offset, ))
self.file.seek(offset)
return self.file.read(size)
if self.downloader:
# then we're busy doing an async download
# (and hence implicitly, we're in an environment that supports twisted)
#self.log('passing read() to %s' % (self.downloader, ))
d = self.downloader.read(offset, size)
def thunk(failure):
raise EIO(str(failure))
d.addErrback(thunk)
return d
else:
self.log('servicing read() from %s' % (self.file, ))
self.file.seek(offset)
return self.file.read(size)
@logexc
def write(self, buf, offset):
@ -320,8 +454,13 @@ class TahoeFuseFile(object):
def fgetattr(self):
self.log("fgetattr()")
s = os.fstat(self.fd)
self.log("fgetattr() -> %r" % (s,))
return stat_to_dict(s)
d = stat_to_dict(s)
if self.downloader:
size = self.downloader.target_size
self.log("fgetattr() during async download, cache file: %s, size=%s" % (s, size))
d['st_size'] = size
self.log("fgetattr() -> %r" % (d,))
return d
@logexc
def ftruncate(self, len):
@ -945,10 +1084,11 @@ class File(object):
class TFS(object):
def __init__(self, nodedir, nodeurl, root_uri,
cache_validity_period=DEFAULT_DIRECTORY_VALIDITY):
cache_validity_period=DEFAULT_DIRECTORY_VALIDITY, async=False):
self.cache_validity = cache_validity_period
self.nodeurl = nodeurl
self.root_uri = root_uri
self.async = async
self.dirs = {}
cachedir = os.path.expanduser(os.path.join(nodedir, '_cache'))
@ -1090,6 +1230,7 @@ class FileCache(object):
self.tmpdir = os.path.join(self.cachedir, 'tmp')
if not os.path.exists(self.tmpdir):
os.makedirs(self.tmpdir)
self.downloaders = weakref.WeakValueDictionary()
def log(self, msg):
log("<FC> %s" % (msg, ))
@ -1099,7 +1240,11 @@ class FileCache(object):
if uri.startswith("URI:LIT"):
return self.get_literal(uri)
else:
return self.get_chk(uri)
return self.get_chk(uri, async=False)
def async_get_file(self, uri):
self.log('get_file(%s)' % (uri,))
return self.get_chk(uri, async=True)
def get_literal(self, uri):
h = sha.new(uri).digest()
@ -1112,7 +1257,7 @@ class FileCache(object):
fh.close()
return fname
def get_chk(self, uri):
def get_chk(self, uri, async=False):
u = CHKFileURI.init_from_string(str(uri))
storage_index = u.storage_index
size = u.size
@ -1120,20 +1265,36 @@ class FileCache(object):
if os.path.exists(fname):
fsize = os.path.getsize(fname)
if fsize == size:
return fname
if async:
return fname, None
else:
return fname
else:
self.log('warning file "%s" is too short %s < %s' % (fname, fsize, size))
self.log('downloading file %s (%s)' % (fname, size, ))
fh = open(fname, 'wb')
url = "%suri/%s" % (self.nodeurl, uri)
download = urllib.urlopen(''.join([ self.nodeurl, "uri/", uri ]))
while True:
chunk = download.read(4096)
if not chunk:
break
fh.write(chunk)
fh.close()
return fname
if async:
if fname in self.downloaders and self.downloaders[fname].running:
downloader = self.downloaders[fname]
else:
downloader = DownloaderWithReadQueue()
self.downloaders[fname] = downloader
d = downloader.start(url, fname, target_size=u.size)
def clear_downloader(result, fname):
self.log('clearing %s from downloaders: %r' % (fname, result))
self.downloaders.pop(fname, None)
d.addBoth(clear_downloader, fname)
return fname, downloader
else:
fh = open(fname, 'wb')
download = urllib.urlopen(url)
while True:
chunk = download.read(4096)
if not chunk:
break
fh.write(chunk)
fh.close()
return fname
def tmp_file(self, id):
fname = os.path.join(self.tmpdir, base64.b32encode(id).lower())
@ -1483,7 +1644,7 @@ def main(argv):
log('\n'+(24*'_')+'init (unsplit)'+(24*'_')+'\n')
cache_timeout = float(config['cache-timeout'])
tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout)
tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=False)
#print tfs.pprint()
# make tfs instance accesible to print_tree() for dbg
@ -1504,7 +1665,7 @@ def main(argv):
try:
cache_timeout = float(config['cache-timeout'])
tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout)
tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=True)
#print tfs.pprint()
# make tfs instance accesible to print_tree() for dbg