mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-17 10:20:13 +00:00
immutable/filenode.py: put off DownloadStatus creation until first read() call
This avoids spamming the "recent uploads and downloads" /status page from FileNode instances that were created for a directory read but which nobody is ever going to read from. I also cleaned up the way DownloadStatus instances are made to only ever do it in the CiphertextFileNode, not in the higher-level plaintext FileNode. Also fixed DownloadStatus handling of read size, thanks to David-Sarah for the catch.
This commit is contained in:
parent
ed821d1504
commit
5267fc17fe
@ -3,7 +3,7 @@ import binascii
|
|||||||
import copy
|
import copy
|
||||||
import time
|
import time
|
||||||
now = time.time
|
now = time.time
|
||||||
from zope.interface import implements
|
from zope.interface import implements, Interface
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from twisted.internet.interfaces import IConsumer
|
from twisted.internet.interfaces import IConsumer
|
||||||
|
|
||||||
@ -19,36 +19,49 @@ from allmydata.immutable.repairer import Repairer
|
|||||||
from allmydata.immutable.downloader.node import DownloadNode
|
from allmydata.immutable.downloader.node import DownloadNode
|
||||||
from allmydata.immutable.downloader.status import DownloadStatus
|
from allmydata.immutable.downloader.status import DownloadStatus
|
||||||
|
|
||||||
|
class IDownloadStatusHandlingConsumer(Interface):
|
||||||
|
def set_download_status_read_event(read_ev):
|
||||||
|
"""Record the DownloadStatus 'read event', to be updated with the
|
||||||
|
time it takes to decrypt each chunk of data."""
|
||||||
|
|
||||||
class CiphertextFileNode:
|
class CiphertextFileNode:
|
||||||
def __init__(self, verifycap, storage_broker, secret_holder,
|
def __init__(self, verifycap, storage_broker, secret_holder,
|
||||||
terminator, history, download_status=None):
|
terminator, history):
|
||||||
assert isinstance(verifycap, uri.CHKFileVerifierURI)
|
assert isinstance(verifycap, uri.CHKFileVerifierURI)
|
||||||
self._verifycap = verifycap
|
self._verifycap = verifycap
|
||||||
self._storage_broker = storage_broker
|
self._storage_broker = storage_broker
|
||||||
self._secret_holder = secret_holder
|
self._secret_holder = secret_holder
|
||||||
if download_status is None:
|
|
||||||
ds = DownloadStatus(verifycap.storage_index, verifycap.size)
|
|
||||||
if history:
|
|
||||||
history.add_download(ds)
|
|
||||||
download_status = ds
|
|
||||||
self._terminator = terminator
|
self._terminator = terminator
|
||||||
self._history = history
|
self._history = history
|
||||||
self._download_status = download_status
|
self._download_status = None
|
||||||
self._node = None # created lazily, on read()
|
self._node = None # created lazily, on read()
|
||||||
|
|
||||||
def _maybe_create_download_node(self):
|
def _maybe_create_download_node(self):
|
||||||
|
if not self._download_status:
|
||||||
|
ds = DownloadStatus(self._verifycap.storage_index,
|
||||||
|
self._verifycap.size)
|
||||||
|
if self._history:
|
||||||
|
self._history.add_download(ds)
|
||||||
|
self._download_status = ds
|
||||||
if self._node is None:
|
if self._node is None:
|
||||||
self._node = DownloadNode(self._verifycap, self._storage_broker,
|
self._node = DownloadNode(self._verifycap, self._storage_broker,
|
||||||
self._secret_holder,
|
self._secret_holder,
|
||||||
self._terminator,
|
self._terminator,
|
||||||
self._history, self._download_status)
|
self._history, self._download_status)
|
||||||
|
|
||||||
def read(self, consumer, offset=0, size=None, read_ev=None):
|
def read(self, consumer, offset=0, size=None):
|
||||||
"""I am the main entry point, from which FileNode.read() can get
|
"""I am the main entry point, from which FileNode.read() can get
|
||||||
data. I feed the consumer with the desired range of ciphertext. I
|
data. I feed the consumer with the desired range of ciphertext. I
|
||||||
return a Deferred that fires (with the consumer) when the read is
|
return a Deferred that fires (with the consumer) when the read is
|
||||||
finished."""
|
finished."""
|
||||||
self._maybe_create_download_node()
|
self._maybe_create_download_node()
|
||||||
|
actual_size = size
|
||||||
|
if actual_size is None:
|
||||||
|
actual_size = self._verifycap.size - offset
|
||||||
|
read_ev = self._download_status.add_read_event(offset, actual_size,
|
||||||
|
now())
|
||||||
|
if IDownloadStatusHandlingConsumer.providedBy(consumer):
|
||||||
|
consumer.set_download_status_read_event(read_ev)
|
||||||
return self._node.read(consumer, offset, size, read_ev)
|
return self._node.read(consumer, offset, size, read_ev)
|
||||||
|
|
||||||
def get_segment(self, segnum):
|
def get_segment(self, segnum):
|
||||||
@ -155,17 +168,16 @@ class CiphertextFileNode:
|
|||||||
monitor=monitor)
|
monitor=monitor)
|
||||||
return v.start()
|
return v.start()
|
||||||
|
|
||||||
|
|
||||||
class DecryptingConsumer:
|
class DecryptingConsumer:
|
||||||
"""I sit between a CiphertextDownloader (which acts as a Producer) and
|
"""I sit between a CiphertextDownloader (which acts as a Producer) and
|
||||||
the real Consumer, decrypting everything that passes by. The real
|
the real Consumer, decrypting everything that passes by. The real
|
||||||
Consumer sees the real Producer, but the Producer sees us instead of the
|
Consumer sees the real Producer, but the Producer sees us instead of the
|
||||||
real consumer."""
|
real consumer."""
|
||||||
implements(IConsumer)
|
implements(IConsumer, IDownloadStatusHandlingConsumer)
|
||||||
|
|
||||||
def __init__(self, consumer, readkey, offset, read_event):
|
def __init__(self, consumer, readkey, offset):
|
||||||
self._consumer = consumer
|
self._consumer = consumer
|
||||||
self._read_event = read_event
|
self._read_event = None
|
||||||
# TODO: pycryptopp CTR-mode needs random-access operations: I want
|
# TODO: pycryptopp CTR-mode needs random-access operations: I want
|
||||||
# either a=AES(readkey, offset) or better yet both of:
|
# either a=AES(readkey, offset) or better yet both of:
|
||||||
# a=AES(readkey, offset=0)
|
# a=AES(readkey, offset=0)
|
||||||
@ -177,6 +189,9 @@ class DecryptingConsumer:
|
|||||||
self._decryptor = AES(readkey, iv=iv)
|
self._decryptor = AES(readkey, iv=iv)
|
||||||
self._decryptor.process("\x00"*offset_small)
|
self._decryptor.process("\x00"*offset_small)
|
||||||
|
|
||||||
|
def set_download_status_read_event(self, read_ev):
|
||||||
|
self._read_event = read_ev
|
||||||
|
|
||||||
def registerProducer(self, producer, streaming):
|
def registerProducer(self, producer, streaming):
|
||||||
# this passes through, so the real consumer can flow-control the real
|
# this passes through, so the real consumer can flow-control the real
|
||||||
# producer. Therefore we don't need to provide any IPushProducer
|
# producer. Therefore we don't need to provide any IPushProducer
|
||||||
@ -188,8 +203,9 @@ class DecryptingConsumer:
|
|||||||
def write(self, ciphertext):
|
def write(self, ciphertext):
|
||||||
started = now()
|
started = now()
|
||||||
plaintext = self._decryptor.process(ciphertext)
|
plaintext = self._decryptor.process(ciphertext)
|
||||||
elapsed = now() - started
|
if self._read_event:
|
||||||
self._read_event.update(0, elapsed, 0)
|
elapsed = now() - started
|
||||||
|
self._read_event.update(0, elapsed, 0)
|
||||||
self._consumer.write(plaintext)
|
self._consumer.write(plaintext)
|
||||||
|
|
||||||
class ImmutableFileNode:
|
class ImmutableFileNode:
|
||||||
@ -200,12 +216,8 @@ class ImmutableFileNode:
|
|||||||
history):
|
history):
|
||||||
assert isinstance(filecap, uri.CHKFileURI)
|
assert isinstance(filecap, uri.CHKFileURI)
|
||||||
verifycap = filecap.get_verify_cap()
|
verifycap = filecap.get_verify_cap()
|
||||||
ds = DownloadStatus(verifycap.storage_index, verifycap.size)
|
|
||||||
if history:
|
|
||||||
history.add_download(ds)
|
|
||||||
self._download_status = ds
|
|
||||||
self._cnode = CiphertextFileNode(verifycap, storage_broker,
|
self._cnode = CiphertextFileNode(verifycap, storage_broker,
|
||||||
secret_holder, terminator, history, ds)
|
secret_holder, terminator, history)
|
||||||
assert isinstance(filecap, uri.CHKFileURI)
|
assert isinstance(filecap, uri.CHKFileURI)
|
||||||
self.u = filecap
|
self.u = filecap
|
||||||
self._readkey = filecap.key
|
self._readkey = filecap.key
|
||||||
@ -226,14 +238,8 @@ class ImmutableFileNode:
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
def read(self, consumer, offset=0, size=None):
|
def read(self, consumer, offset=0, size=None):
|
||||||
actual_size = size
|
decryptor = DecryptingConsumer(consumer, self._readkey, offset)
|
||||||
if actual_size == None:
|
d = self._cnode.read(decryptor, offset, size)
|
||||||
actual_size = self.u.size
|
|
||||||
actual_size = actual_size - offset
|
|
||||||
read_ev = self._download_status.add_read_event(offset,actual_size,
|
|
||||||
now())
|
|
||||||
decryptor = DecryptingConsumer(consumer, self._readkey, offset, read_ev)
|
|
||||||
d = self._cnode.read(decryptor, offset, size, read_ev)
|
|
||||||
d.addCallback(lambda dc: consumer)
|
d.addCallback(lambda dc: consumer)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user