mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-27 14:30:19 +00:00
mutable read: enable the cache (written during mapupdate, read during retrieve). This speeds up small-file reads by about 30% over a link with an average 25ms RTT
This commit is contained in:
parent
e8b0f3d3cb
commit
6af124dc3e
@ -4,7 +4,7 @@ from itertools import count
|
|||||||
from zope.interface import implements
|
from zope.interface import implements
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from twisted.python import failure
|
from twisted.python import failure
|
||||||
from foolscap.eventual import eventually
|
from foolscap.eventual import eventually, fireEventually
|
||||||
from allmydata.interfaces import IRetrieveStatus
|
from allmydata.interfaces import IRetrieveStatus
|
||||||
from allmydata.util import hashutil, idlib, log
|
from allmydata.util import hashutil, idlib, log
|
||||||
from allmydata import hashtree, codec, storage
|
from allmydata import hashtree, codec, storage
|
||||||
@ -179,20 +179,20 @@ class Retrieve:
|
|||||||
|
|
||||||
# ask the cache first
|
# ask the cache first
|
||||||
got_from_cache = False
|
got_from_cache = False
|
||||||
datav = []
|
datavs = []
|
||||||
#for (offset, length) in readv:
|
for (offset, length) in readv:
|
||||||
# (data, timestamp) = self._node._cache.read(self.verinfo, shnum,
|
(data, timestamp) = self._node._cache.read(self.verinfo, shnum,
|
||||||
# offset, length)
|
offset, length)
|
||||||
# if data is not None:
|
if data is not None:
|
||||||
# datav.append(data)
|
datavs.append(data)
|
||||||
if len(datav) == len(readv):
|
if len(datavs) == len(readv):
|
||||||
self.log("got data from cache")
|
self.log("got data from cache")
|
||||||
got_from_cache = True
|
got_from_cache = True
|
||||||
d = defer.succeed(datav)
|
d = fireEventually({shnum: datavs})
|
||||||
|
# datavs is a dict mapping shnum to a pair of strings
|
||||||
else:
|
else:
|
||||||
self.remaining_sharemap[shnum].remove(peerid)
|
|
||||||
d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
|
d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
|
||||||
d.addCallback(self._fill_cache, readv)
|
self.remaining_sharemap.discard(shnum, peerid)
|
||||||
|
|
||||||
d.addCallback(self._got_results, m, peerid, started, got_from_cache)
|
d.addCallback(self._got_results, m, peerid, started, got_from_cache)
|
||||||
d.addErrback(self._query_failed, m, peerid)
|
d.addErrback(self._query_failed, m, peerid)
|
||||||
@ -212,15 +212,6 @@ class Retrieve:
|
|||||||
d.addErrback(log.err)
|
d.addErrback(log.err)
|
||||||
return d # purely for testing convenience
|
return d # purely for testing convenience
|
||||||
|
|
||||||
def _fill_cache(self, datavs, readv):
|
|
||||||
timestamp = time.time()
|
|
||||||
for shnum,datav in datavs.items():
|
|
||||||
for i, (offset, length) in enumerate(readv):
|
|
||||||
data = datav[i]
|
|
||||||
self._node._cache.add(self.verinfo, shnum, offset, data,
|
|
||||||
timestamp)
|
|
||||||
return datavs
|
|
||||||
|
|
||||||
def _do_read(self, ss, peerid, storage_index, shnums, readv):
|
def _do_read(self, ss, peerid, storage_index, shnums, readv):
|
||||||
# isolate the callRemote to a separate method, so tests can subclass
|
# isolate the callRemote to a separate method, so tests can subclass
|
||||||
# Publish and override it
|
# Publish and override it
|
||||||
@ -482,7 +473,8 @@ class Retrieve:
|
|||||||
self._status.timings["total"] = time.time() - self._started
|
self._status.timings["total"] = time.time() - self._started
|
||||||
# res is either the new contents, or a Failure
|
# res is either the new contents, or a Failure
|
||||||
if isinstance(res, failure.Failure):
|
if isinstance(res, failure.Failure):
|
||||||
self.log("Retrieve done, with failure", failure=res)
|
self.log("Retrieve done, with failure", failure=res,
|
||||||
|
level=log.UNUSUAL)
|
||||||
self._status.set_status("Failed")
|
self._status.set_status("Failed")
|
||||||
else:
|
else:
|
||||||
self.log("Retrieve done, success!")
|
self.log("Retrieve done, success!")
|
||||||
|
@ -511,6 +511,7 @@ class ServermapUpdater:
|
|||||||
verinfo = self._got_results_one_share(shnum, data, peerid)
|
verinfo = self._got_results_one_share(shnum, data, peerid)
|
||||||
last_verinfo = verinfo
|
last_verinfo = verinfo
|
||||||
last_shnum = shnum
|
last_shnum = shnum
|
||||||
|
self._node._cache.add(verinfo, shnum, 0, data, now)
|
||||||
except CorruptShareError, e:
|
except CorruptShareError, e:
|
||||||
# log it and give the other shares a chance to be processed
|
# log it and give the other shares a chance to be processed
|
||||||
f = failure.Failure()
|
f = failure.Failure()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user