#527: expire the cached files that are used to support Range: headers, every hour, when the file is unused and older than an hour

This commit is contained in:
Brian Warner 2008-10-30 13:39:09 -07:00
parent c205a54965
commit ba019bfd3a
3 changed files with 34 additions and 26 deletions

View File

@ -18,7 +18,7 @@ from allmydata.immutable.filenode import FileNode, LiteralFileNode
from allmydata.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
from allmydata.util import hashutil, base32, pollmixin, fileutil
from allmydata.util import hashutil, base32, pollmixin, cachedir
from allmydata.uri import LiteralFileURI
from allmydata.dirnode import NewDirectoryNode
from allmydata.mutable.node import MutableFileNode, MutableWatcher
@ -188,9 +188,10 @@ class Client(node.Node, pollmixin.PollMixin):
self.convergence = base32.a2b(convergence_s)
self._node_cache = weakref.WeakValueDictionary() # uri -> node
self.add_service(Uploader(helper_furl, self.stats_provider))
self.download_cachedir = os.path.join(self.basedir,
download_cachedir = os.path.join(self.basedir,
"private", "cache", "download")
fileutil.make_dirs(self.download_cachedir)
self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
self.download_cache.setServiceParent(self)
self.add_service(Downloader(self.stats_provider))
self.add_service(MutableWatcher(self.stats_provider))
def _publish(res):
@ -339,9 +340,8 @@ class Client(node.Node, pollmixin.PollMixin):
if isinstance(u, LiteralFileURI):
node = LiteralFileNode(u, self) # LIT
else:
cachefile = os.path.join(self.download_cachedir,
base32.b2a(u.storage_index))
# TODO: cachefile manager, weakref, expire policy
key = base32.b2a(u.storage_index)
cachefile = self.download_cache.get_file(key)
node = FileNode(u, self, cachefile) # CHK
else:
assert IMutableFileURI.providedBy(u), u

View File

@ -62,13 +62,15 @@ class PortionOfFile:
self.bytes_left -= len(data)
return data
class CacheFile:
class DownloadCache:
implements(IDownloadTarget)
def __init__(self, node, filename):
self.node = node
def __init__(self, node, cachefile):
self._downloader = node._client.getServiceNamed("downloader")
self._uri = node.get_uri()
self._storage_index = node.get_storage_index()
self.milestones = set() # of (offset,size,Deferred)
self.cachefilename = filename
self.cachefile = cachefile
self.download_in_progress = False
# five states:
# new FileNode, no downloads ever performed
@ -88,10 +90,9 @@ class CacheFile:
self.download_in_progress = True
log.msg(format=("immutable filenode read [%(si)s]: " +
"starting download"),
si=base32.b2a(self.node.u.storage_index),
si=base32.b2a(self._storage_index),
umid="h26Heg", level=log.OPERATIONAL)
downloader = self.node._client.getServiceNamed("downloader")
d2 = downloader.download(self.node.get_uri(), self)
d2 = self._downloader.download(self._uri, self)
d2.addBoth(self._download_done)
d2.addErrback(self._download_failed)
d2.addErrback(log.err, umid="cQaM9g")
@ -99,7 +100,7 @@ class CacheFile:
def read(self, consumer, offset, size):
assert offset+size <= self.get_filesize()
f = PortionOfFile(self.cachefilename, offset, size)
f = PortionOfFile(self.cachefile.get_filename(), offset, size)
d = basic.FileSender().beginFileTransfer(f, consumer)
d.addCallback(lambda lastSent: consumer)
return d
@ -124,7 +125,7 @@ class CacheFile:
log.msg(format=("immutable filenode read [%(si)s] " +
"%(offset)d+%(size)d vs %(filesize)d: " +
"done"),
si=base32.b2a(self.node.u.storage_index),
si=base32.b2a(self._storage_index),
offset=offset, size=size, filesize=current_size,
umid="nuedUg", level=log.NOISY)
self.milestones.discard(m)
@ -133,20 +134,20 @@ class CacheFile:
log.msg(format=("immutable filenode read [%(si)s] " +
"%(offset)d+%(size)d vs %(filesize)d: " +
"still waiting"),
si=base32.b2a(self.node.u.storage_index),
si=base32.b2a(self._storage_index),
offset=offset, size=size, filesize=current_size,
umid="8PKOhg", level=log.NOISY)
def get_filesize(self):
try:
filesize = os.stat(self.cachefilename)[stat.ST_SIZE]
filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
except OSError:
filesize = 0
return filesize
def open(self, size):
self.f = open(self.cachefilename, "wb")
self.f = open(self.cachefile.get_filename(), "wb")
def write(self, data):
self.f.write(data)
@ -171,7 +172,7 @@ class FileNode(_ImmutableFileNodeBase):
def __init__(self, uri, client, cachefile):
_ImmutableFileNodeBase.__init__(self, uri, client)
self.cachefile = CacheFile(self, cachefile)
self.download_cache = DownloadCache(self, cachefile)
def get_uri(self):
return self.u.to_string()
@ -230,8 +231,9 @@ class FileNode(_ImmutableFileNodeBase):
umid="VRSBwg", level=log.OPERATIONAL)
return self.download(download.ConsumerAdapter(consumer))
d = self.cachefile.when_range_available(offset, size)
d.addCallback(lambda res: self.cachefile.read(consumer, offset, size))
d = self.download_cache.when_range_available(offset, size)
d.addCallback(lambda res:
self.download_cache.read(consumer, offset, size))
return d
def download(self, target):

View File

@ -4,12 +4,17 @@ from allmydata import uri
from allmydata.monitor import Monitor
from allmydata.immutable import filenode, download
from allmydata.mutable.node import MutableFileNode
from allmydata.util import hashutil
from allmydata.util import hashutil, cachedir
from allmydata.test.common import download_to_data
class NotANode:
pass
class FakeClient:
# just enough to let the node acquire a downloader (which it won't use)
def getServiceNamed(self, name):
return None
class Node(unittest.TestCase):
def test_chk_filenode(self):
u = uri.CHKFileURI(key="\x00"*16,
@ -17,9 +22,10 @@ class Node(unittest.TestCase):
needed_shares=3,
total_shares=10,
size=1000)
c = None
fn1 = filenode.FileNode(u, c, "cachefile")
fn2 = filenode.FileNode(u.to_string(), c, "cachefile")
c = FakeClient()
cf = cachedir.CacheFile("none")
fn1 = filenode.FileNode(u, c, cf)
fn2 = filenode.FileNode(u.to_string(), c, cf)
self.failUnlessEqual(fn1, fn2)
self.failIfEqual(fn1, "I am not a filenode")
self.failIfEqual(fn1, NotANode())