repairer.py: wrap to 80cols. No code changes.

This commit is contained in:
Brian Warner 2009-06-30 17:00:47 -07:00
parent a1c6ee17c3
commit 8fca155a66

View File

@ -12,36 +12,43 @@ from allmydata.immutable import download, upload
import collections
class Repairer(log.PrefixingLogMixin):
""" I generate any shares which were not available and upload them to servers.
"""I generate any shares which were not available and upload them to
servers.
Which servers? Well, I just use the normal upload process, so any servers that will take
shares. In fact, I even believe servers if they say that they already have shares even if
attempts to download those shares would fail because the shares are corrupted.
Which servers? Well, I just use the normal upload process, so any servers
that will take shares. In fact, I even believe servers if they say that
they already have shares even if attempts to download those shares would
fail because the shares are corrupted.
My process of uploading replacement shares proceeds in a segment-wise fashion -- first I ask
servers if they can hold the new shares, and wait until enough have agreed then I download
the first segment of the file and upload the first block of each replacement share, and only
after all those blocks have been uploaded do I download the second segment of the file and
upload the second block of each replacement share to its respective server. (I do it this
way in order to minimize the amount of downloading I have to do and the amount of memory I
have to use at any one time.)
My process of uploading replacement shares proceeds in a segment-wise
fashion -- first I ask servers if they can hold the new shares, and wait
until enough have agreed then I download the first segment of the file
and upload the first block of each replacement share, and only after all
those blocks have been uploaded do I download the second segment of the
file and upload the second block of each replacement share to its
respective server. (I do it this way in order to minimize the amount of
downloading I have to do and the amount of memory I have to use at any
one time.)
If any of the servers to which I am uploading replacement shares fails to accept the blocks
during this process, then I just stop using that server, abandon any share-uploads that were
going to that server, and proceed to finish uploading the remaining shares to their
respective servers. At the end of my work, I produce an object which satisfies the
ICheckAndRepairResults interface (by firing the deferred that I returned from start() and
passing that check-and-repair-results object).
If any of the servers to which I am uploading replacement shares fails to
accept the blocks during this process, then I just stop using that
server, abandon any share-uploads that were going to that server, and
proceed to finish uploading the remaining shares to their respective
servers. At the end of my work, I produce an object which satisfies the
ICheckAndRepairResults interface (by firing the deferred that I returned
from start() and passing that check-and-repair-results object).
Before I send any new request to a server, I always ask the "monitor" object that was passed
into my constructor whether this task has been cancelled (by invoking its
raise_if_cancelled() method).
Before I send any new request to a server, I always ask the 'monitor'
object that was passed into my constructor whether this task has been
cancelled (by invoking its raise_if_cancelled() method).
"""
def __init__(self, client, verifycap, monitor):
assert precondition(isinstance(verifycap, CHKFileVerifierURI))
logprefix = si_b2a(verifycap.storage_index)[:5]
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer", prefix=logprefix)
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer",
prefix=logprefix)
self._client = client
self._verifycap = verifycap
@ -51,12 +58,14 @@ class Repairer(log.PrefixingLogMixin):
self.log("starting repair")
duc = DownUpConnector()
sb = self._client.get_storage_broker()
dl = download.CiphertextDownloader(sb, self._verifycap, target=duc, monitor=self._monitor)
dl = download.CiphertextDownloader(sb, self._verifycap, target=duc,
monitor=self._monitor)
ul = upload.CHKUploader(self._client)
d = defer.Deferred()
# If the upload or the download fails or is stopped, then the repair failed.
# If the upload or the download fails or is stopped, then the repair
# failed.
def _errb(f):
d.errback(f)
return None
@ -76,63 +85,74 @@ class Repairer(log.PrefixingLogMixin):
class DownUpConnector(log.PrefixingLogMixin):
implements(IEncryptedUploadable, IDownloadTarget, IConsumer)
""" I act like an "encrypted uploadable" -- something that a local uploader can read
ciphertext from in order to upload the ciphertext. However, unbeknownst to the uploader,
I actually download the ciphertext from a CiphertextDownloader instance as it is needed.
"""I act like an 'encrypted uploadable' -- something that a local
uploader can read ciphertext from in order to upload the ciphertext.
However, unbeknownst to the uploader, I actually download the ciphertext
from a CiphertextDownloader instance as it is needed.
On the other hand, I act like a "download target" -- something that a local downloader can
write ciphertext to as it downloads the ciphertext. That downloader doesn't realize, of
course, that I'm just turning around and giving the ciphertext to the uploader. """
On the other hand, I act like a 'download target' -- something that a
local downloader can write ciphertext to as it downloads the ciphertext.
That downloader doesn't realize, of course, that I'm just turning around
and giving the ciphertext to the uploader."""
# The theory behind this class is nice: just satisfy two separate interfaces. The
# implementation is slightly horrible, because of "impedance mismatch" -- the downloader
# expects to be able to synchronously push data in, and the uploader expects to be able to
# read data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred. The two
# interfaces have different APIs for pausing/unpausing. The uploader requests metadata like
# size and encodingparams which the downloader provides either eventually or not at all
# (okay I just now extended the downloader to provide encodingparams). Most of this
# slightly horrible code would disappear if CiphertextDownloader just used this object as an
# IConsumer (plus maybe a couple of other methods) and if the Uploader simply expected to be
# treated as an IConsumer (plus maybe a couple of other things).
# The theory behind this class is nice: just satisfy two separate
# interfaces. The implementation is slightly horrible, because of
# "impedance mismatch" -- the downloader expects to be able to
# synchronously push data in, and the uploader expects to be able to read
# data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred.
# The two interfaces have different APIs for pausing/unpausing. The
# uploader requests metadata like size and encodingparams which the
# downloader provides either eventually or not at all (okay I just now
# extended the downloader to provide encodingparams). Most of this
# slightly horrible code would disappear if CiphertextDownloader just
# used this object as an IConsumer (plus maybe a couple of other methods)
# and if the Uploader simply expected to be treated as an IConsumer (plus
# maybe a couple of other things).
def __init__(self, buflim=2**19):
""" If we're already holding at least buflim bytes, then tell the downloader to pause
until we have less than buflim bytes."""
"""If we're already holding at least buflim bytes, then tell the
downloader to pause until we have less than buflim bytes."""
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer")
self.buflim = buflim
self.bufs = collections.deque() # list of strings
self.bufsiz = 0 # how many bytes total in bufs
self.next_read_ds = collections.deque() # list of deferreds which will fire with the requested ciphertext
self.next_read_lens = collections.deque() # how many bytes of ciphertext were requested by each deferred
# list of deferreds which will fire with the requested ciphertext
self.next_read_ds = collections.deque()
# how many bytes of ciphertext were requested by each deferred
self.next_read_lens = collections.deque()
self._size_osol = observer.OneShotObserverList()
self._encodingparams_osol = observer.OneShotObserverList()
self._storageindex_osol = observer.OneShotObserverList()
self._closed_to_pusher = False
# once seg size is available, the following attribute will be created to hold it:
# once seg size is available, the following attribute will be created
# to hold it:
# self.encodingparams # (provided by the object which is pushing data into me, required
# by the object which is pulling data out of me)
# self.encodingparams # (provided by the object which is pushing data
# into me, required by the object which is pulling data out of me)
# open() will create the following attribute:
# self.size # size of the whole file (provided by the object which is pushing data into
# me, required by the object which is pulling data out of me)
# self.size # size of the whole file (provided by the object which is
# pushing data into me, required by the object which is pulling data
# out of me)
# set_upload_status() will create the following attribute:
# self.upload_status # XXX do we need to actually update this? Is anybody watching the
# results during a repair?
# self.upload_status # XXX do we need to actually update this? Is
# anybody watching the results during a repair?
def _satisfy_reads_if_possible(self):
assert bool(self.next_read_ds) == bool(self.next_read_lens)
while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0]) or self._closed_to_pusher):
while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0])
or self._closed_to_pusher):
nrd = self.next_read_ds.popleft()
nrl = self.next_read_lens.popleft()
# Pick out the requested number of bytes from self.bufs, turn it into a string, and
# callback the deferred with that.
# Pick out the requested number of bytes from self.bufs, turn it
# into a string, and callback the deferred with that.
res = []
ressize = 0
while ressize < nrl and self.bufs:
@ -150,8 +170,9 @@ class DownUpConnector(log.PrefixingLogMixin):
self.producer.resumeProducing()
nrd.callback(res)
# methods to satisfy the IConsumer and IDownloadTarget interfaces
# (From the perspective of a downloader I am an IDownloadTarget and an IConsumer.)
# methods to satisfy the IConsumer and IDownloadTarget interfaces. (From
# the perspective of a downloader I am an IDownloadTarget and an
# IConsumer.)
def registerProducer(self, producer, streaming):
assert streaming # We know how to handle only streaming producers.
self.producer = producer # the downloader
@ -192,12 +213,14 @@ class DownUpConnector(log.PrefixingLogMixin):
return self._size_osol.when_fired()
def get_all_encoding_parameters(self):
# We have to learn the encoding params from pusher.
if hasattr(self, 'encodingparams'): # attribute created by self.set_encodingparams()
if hasattr(self, 'encodingparams'):
# attribute created by self.set_encodingparams()
return defer.succeed(self.encodingparams)
else:
return self._encodingparams_osol.when_fired()
def read_encrypted(self, length, hash_only):
""" Returns a deferred which eventually fired with the requested ciphertext. """
"""Returns a deferred which eventually fired with the requested
ciphertext."""
precondition(length) # please don't ask to read 0 bytes
d = defer.Deferred()
self.next_read_ds.append(d)
@ -206,7 +229,8 @@ class DownUpConnector(log.PrefixingLogMixin):
return d
def get_storage_index(self):
# We have to learn the storage index from pusher.
if hasattr(self, 'storageindex'): # attribute created by self.set_storageindex()
if hasattr(self, 'storageindex'):
# attribute created by self.set_storageindex()
return defer.succeed(self.storageindex)
else:
return self._storageindex.when_fired()