mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-27 08:22:32 +00:00
0d5dc51617
* stop using IURI as an adapter * pass cap strings around instead of URI instances * move filenode/dirnode creation duties from Client to new NodeMaker class * move other Client duties to KeyGenerator, SecretHolder, History classes * stop passing Client reference to dirnode/filenode constructors - pass less-powerful references instead, like StorageBroker or Uploader * always create DirectoryNodes by wrapping a filenode (mutable for now) * remove some specialized mock classes from unit tests Detailed list of changes (done one at a time, then merged together) always pass a string to create_node_from_uri(), not an IURI instance always pass a string to IFilesystemNode constructors, not an IURI instance stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri() client.py: move SecretHolder code out to a separate class test_web.py: hush pyflakes client.py: move NodeMaker functionality out into a separate object LiteralFileNode: stop storing a Client reference immutable Checker: remove Client reference, it only needs a SecretHolder immutable Upload: remove Client reference, leave SecretHolder and StorageBroker immutable Repairer: replace Client reference with StorageBroker and SecretHolder immutable FileNode: remove Client reference mutable.Publish: stop passing Client mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference MutableChecker: reference StorageBroker and History directly, not through Client mutable.FileNode: removed unused indirection to checker classes mutable.FileNode: remove Client reference client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker move create_mutable_file() into NodeMaker test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests. test_mutable.py: clean up basedir names client.py: move create_empty_dirnode() into NodeMaker dirnode.py: get rid of DirectoryNode.create remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker remove Client from NodeMaker move helper status into History, pass History to web.Status instead of Client test_mutable.py: fix minor typo
238 lines
11 KiB
Python
238 lines
11 KiB
Python
from zope.interface import implements
|
|
from twisted.internet import defer
|
|
from allmydata.storage.server import si_b2a
|
|
from allmydata.util import log, observer
|
|
from allmydata.util.assertutil import precondition, _assert
|
|
from allmydata.uri import CHKFileVerifierURI
|
|
from allmydata.interfaces import IEncryptedUploadable, IDownloadTarget
|
|
from twisted.internet.interfaces import IConsumer
|
|
|
|
from allmydata.immutable import download, upload
|
|
|
|
import collections
|
|
|
|
class Repairer(log.PrefixingLogMixin):
|
|
"""I generate any shares which were not available and upload them to
|
|
servers.
|
|
|
|
Which servers? Well, I just use the normal upload process, so any servers
|
|
that will take shares. In fact, I even believe servers if they say that
|
|
they already have shares even if attempts to download those shares would
|
|
fail because the shares are corrupted.
|
|
|
|
My process of uploading replacement shares proceeds in a segment-wise
|
|
fashion -- first I ask servers if they can hold the new shares, and wait
|
|
until enough have agreed then I download the first segment of the file
|
|
and upload the first block of each replacement share, and only after all
|
|
those blocks have been uploaded do I download the second segment of the
|
|
file and upload the second block of each replacement share to its
|
|
respective server. (I do it this way in order to minimize the amount of
|
|
downloading I have to do and the amount of memory I have to use at any
|
|
one time.)
|
|
|
|
If any of the servers to which I am uploading replacement shares fails to
|
|
accept the blocks during this process, then I just stop using that
|
|
server, abandon any share-uploads that were going to that server, and
|
|
proceed to finish uploading the remaining shares to their respective
|
|
servers. At the end of my work, I produce an object which satisfies the
|
|
ICheckAndRepairResults interface (by firing the deferred that I returned
|
|
from start() and passing that check-and-repair-results object).
|
|
|
|
Before I send any new request to a server, I always ask the 'monitor'
|
|
object that was passed into my constructor whether this task has been
|
|
cancelled (by invoking its raise_if_cancelled() method).
|
|
"""
|
|
|
|
def __init__(self, storage_broker, secret_holder, verifycap, monitor):
|
|
assert precondition(isinstance(verifycap, CHKFileVerifierURI))
|
|
|
|
logprefix = si_b2a(verifycap.storage_index)[:5]
|
|
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer",
|
|
prefix=logprefix)
|
|
|
|
self._storage_broker = storage_broker
|
|
self._secret_holder = secret_holder
|
|
self._verifycap = verifycap
|
|
self._monitor = monitor
|
|
|
|
def start(self):
|
|
self.log("starting repair")
|
|
duc = DownUpConnector()
|
|
dl = download.CiphertextDownloader(self._storage_broker,
|
|
self._verifycap, target=duc,
|
|
monitor=self._monitor)
|
|
ul = upload.CHKUploader(self._storage_broker, self._secret_holder)
|
|
|
|
d = defer.Deferred()
|
|
|
|
# If the upload or the download fails or is stopped, then the repair
|
|
# failed.
|
|
def _errb(f):
|
|
d.errback(f)
|
|
return None
|
|
|
|
# If the upload succeeds, then the repair has succeeded.
|
|
def _cb(res):
|
|
d.callback(res)
|
|
ul.start(duc).addCallbacks(_cb, _errb)
|
|
|
|
# If the download fails or is stopped, then the repair failed.
|
|
d2 = dl.start()
|
|
d2.addErrback(_errb)
|
|
|
|
# We ignore the callback from d2. Is this right? Ugh.
|
|
|
|
return d
|
|
|
|
class DownUpConnector(log.PrefixingLogMixin):
|
|
implements(IEncryptedUploadable, IDownloadTarget, IConsumer)
|
|
"""I act like an 'encrypted uploadable' -- something that a local
|
|
uploader can read ciphertext from in order to upload the ciphertext.
|
|
However, unbeknownst to the uploader, I actually download the ciphertext
|
|
from a CiphertextDownloader instance as it is needed.
|
|
|
|
On the other hand, I act like a 'download target' -- something that a
|
|
local downloader can write ciphertext to as it downloads the ciphertext.
|
|
That downloader doesn't realize, of course, that I'm just turning around
|
|
and giving the ciphertext to the uploader."""
|
|
|
|
# The theory behind this class is nice: just satisfy two separate
|
|
# interfaces. The implementation is slightly horrible, because of
|
|
# "impedance mismatch" -- the downloader expects to be able to
|
|
# synchronously push data in, and the uploader expects to be able to read
|
|
# data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred.
|
|
# The two interfaces have different APIs for pausing/unpausing. The
|
|
# uploader requests metadata like size and encodingparams which the
|
|
# downloader provides either eventually or not at all (okay I just now
|
|
# extended the downloader to provide encodingparams). Most of this
|
|
# slightly horrible code would disappear if CiphertextDownloader just
|
|
# used this object as an IConsumer (plus maybe a couple of other methods)
|
|
# and if the Uploader simply expected to be treated as an IConsumer (plus
|
|
# maybe a couple of other things).
|
|
|
|
def __init__(self, buflim=2**19):
|
|
"""If we're already holding at least buflim bytes, then tell the
|
|
downloader to pause until we have less than buflim bytes."""
|
|
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer")
|
|
self.buflim = buflim
|
|
self.bufs = collections.deque() # list of strings
|
|
self.bufsiz = 0 # how many bytes total in bufs
|
|
|
|
# list of deferreds which will fire with the requested ciphertext
|
|
self.next_read_ds = collections.deque()
|
|
|
|
# how many bytes of ciphertext were requested by each deferred
|
|
self.next_read_lens = collections.deque()
|
|
|
|
self._size_osol = observer.OneShotObserverList()
|
|
self._encodingparams_osol = observer.OneShotObserverList()
|
|
self._storageindex_osol = observer.OneShotObserverList()
|
|
self._closed_to_pusher = False
|
|
|
|
# once seg size is available, the following attribute will be created
|
|
# to hold it:
|
|
|
|
# self.encodingparams # (provided by the object which is pushing data
|
|
# into me, required by the object which is pulling data out of me)
|
|
|
|
# open() will create the following attribute:
|
|
# self.size # size of the whole file (provided by the object which is
|
|
# pushing data into me, required by the object which is pulling data
|
|
# out of me)
|
|
|
|
# set_upload_status() will create the following attribute:
|
|
|
|
# self.upload_status # XXX do we need to actually update this? Is
|
|
# anybody watching the results during a repair?
|
|
|
|
def _satisfy_reads_if_possible(self):
|
|
assert bool(self.next_read_ds) == bool(self.next_read_lens)
|
|
while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0])
|
|
or self._closed_to_pusher):
|
|
nrd = self.next_read_ds.popleft()
|
|
nrl = self.next_read_lens.popleft()
|
|
|
|
# Pick out the requested number of bytes from self.bufs, turn it
|
|
# into a string, and callback the deferred with that.
|
|
res = []
|
|
ressize = 0
|
|
while ressize < nrl and self.bufs:
|
|
nextbuf = self.bufs.popleft()
|
|
res.append(nextbuf)
|
|
ressize += len(nextbuf)
|
|
if ressize > nrl:
|
|
extra = ressize - nrl
|
|
self.bufs.appendleft(nextbuf[:-extra])
|
|
res[-1] = nextbuf[:-extra]
|
|
assert _assert(sum(len(x) for x in res) <= nrl, [len(x) for x in res], nrl)
|
|
assert _assert(sum(len(x) for x in res) == nrl or self._closed_to_pusher, [len(x) for x in res], nrl)
|
|
self.bufsiz -= nrl
|
|
if self.bufsiz < self.buflim and self.producer:
|
|
self.producer.resumeProducing()
|
|
nrd.callback(res)
|
|
|
|
# methods to satisfy the IConsumer and IDownloadTarget interfaces. (From
|
|
# the perspective of a downloader I am an IDownloadTarget and an
|
|
# IConsumer.)
|
|
def registerProducer(self, producer, streaming):
|
|
assert streaming # We know how to handle only streaming producers.
|
|
self.producer = producer # the downloader
|
|
def unregisterProducer(self):
|
|
self.producer = None
|
|
def open(self, size):
|
|
self.size = size
|
|
self._size_osol.fire(self.size)
|
|
def set_encodingparams(self, encodingparams):
|
|
self.encodingparams = encodingparams
|
|
self._encodingparams_osol.fire(self.encodingparams)
|
|
def set_storageindex(self, storageindex):
|
|
self.storageindex = storageindex
|
|
self._storageindex_osol.fire(self.storageindex)
|
|
def write(self, data):
|
|
precondition(data) # please don't write empty strings
|
|
self.bufs.append(data)
|
|
self.bufsiz += len(data)
|
|
self._satisfy_reads_if_possible()
|
|
if self.bufsiz >= self.buflim and self.producer:
|
|
self.producer.pauseProducing()
|
|
def finish(self):
|
|
pass
|
|
def close(self):
|
|
self._closed_to_pusher = True
|
|
# Any reads which haven't been satisfied by now are going to
|
|
# have to be satisfied with short reads.
|
|
self._satisfy_reads_if_possible()
|
|
|
|
# methods to satisfy the IEncryptedUploader interface
|
|
# (From the perspective of an uploader I am an IEncryptedUploadable.)
|
|
def set_upload_status(self, upload_status):
|
|
self.upload_status = upload_status
|
|
def get_size(self):
|
|
if hasattr(self, 'size'): # attribute created by self.open()
|
|
return defer.succeed(self.size)
|
|
else:
|
|
return self._size_osol.when_fired()
|
|
def get_all_encoding_parameters(self):
|
|
# We have to learn the encoding params from pusher.
|
|
if hasattr(self, 'encodingparams'):
|
|
# attribute created by self.set_encodingparams()
|
|
return defer.succeed(self.encodingparams)
|
|
else:
|
|
return self._encodingparams_osol.when_fired()
|
|
def read_encrypted(self, length, hash_only):
|
|
"""Returns a deferred which eventually fired with the requested
|
|
ciphertext."""
|
|
precondition(length) # please don't ask to read 0 bytes
|
|
d = defer.Deferred()
|
|
self.next_read_ds.append(d)
|
|
self.next_read_lens.append(length)
|
|
self._satisfy_reads_if_possible()
|
|
return d
|
|
def get_storage_index(self):
|
|
# We have to learn the storage index from pusher.
|
|
if hasattr(self, 'storageindex'):
|
|
# attribute created by self.set_storageindex()
|
|
return defer.succeed(self.storageindex)
|
|
else:
|
|
return self._storageindex.when_fired()
|