fix #1223, crash+inefficiency during repair due to read overrun

* repairer (really the uploader) reads beyond end of input file (Uploadable)
* new-downloader does not tolerate overreads
* uploader does lots of tiny reads (inefficient)

This fixes the last two. The uploader still does a single overread at the end
of the input file, but now that's ok so we can leave it in place. The
uploader now expects the Uploadable to behave like a normal disk
file (reading beyond EOF will return less data than was asked for), and now
the new-downloadable behaves that way.
This commit is contained in:
Brian Warner 2010-10-29 01:20:36 -07:00
parent 390c40cd8c
commit c18953c169
4 changed files with 79 additions and 47 deletions

View File

@ -130,8 +130,9 @@ class DownloadNode:
# for concurrent operations: each gets its own Segmentation manager
if size is None:
size = self._verifycap.size
# clip size so offset+size does not go past EOF
size = min(size, self._verifycap.size-offset)
# ignore overruns: clip size so offset+size does not go past EOF, and
# so size is not negative (which indicates that offset >= EOF)
size = max(0, min(size, self._verifycap.size-offset))
if read_ev is None:
read_ev = self._download_status.add_read_event(offset, size, now())
@ -143,6 +144,10 @@ class DownloadNode:
sp = self._history.stats_provider
sp.count("downloader.files_downloaded", 1) # really read() calls
sp.count("downloader.bytes_downloaded", size)
if size == 0:
read_ev.finished(now())
# no data, so no producer, so no register/unregisterProducer
return defer.succeed(consumer)
s = Segmentation(self, offset, size, consumer, read_ev, lp)
# this raises an interesting question: what segments to fetch? if
# offset=0, always fetch the first segment, and then allow

View File

@ -316,6 +316,9 @@ class Encoder(object):
# of additional shares which can be substituted if the primary ones
# are unavailable
# we read data from the source one segment at a time, and then chop
# it into 'input_piece_size' pieces before handing it to the codec
crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
# memory footprint: we only hold a tiny piece of the plaintext at any
@ -350,8 +353,7 @@ class Encoder(object):
crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
d = self._gather_data(self.required_shares, input_piece_size,
crypttext_segment_hasher,
allow_short=True)
crypttext_segment_hasher, allow_short=True)
def _done_gathering(chunks):
for c in chunks:
# a short trailing chunk will have been padded by
@ -369,58 +371,50 @@ class Encoder(object):
def _gather_data(self, num_chunks, input_chunk_size,
crypttext_segment_hasher,
allow_short=False,
previous_chunks=[]):
allow_short=False):
"""Return a Deferred that will fire when the required number of
chunks have been read (and hashed and encrypted). The Deferred fires
with the combination of any 'previous_chunks' and the new chunks
which were gathered."""
with a list of chunks, each of size input_chunk_size."""
# I originally built this to allow read_encrypted() to behave badly:
# to let it return more or less data than you asked for. It would
# stash the leftovers until later, and then recurse until it got
# enough. I don't think that was actually useful.
#
# who defines read_encrypted?
# offloaded.LocalCiphertextReader: real disk file: exact
# upload.EncryptAnUploadable: Uploadable, but a wrapper that makes
# it exact. The return value is a list of 50KiB chunks, to reduce
# the memory footprint of the encryption process.
# repairer.Repairer: immutable.filenode.CiphertextFileNode: exact
#
# This has been redefined to require read_encrypted() to behave like
# a local file: return exactly the amount requested unless it hits
# EOF.
# -warner
if self._aborted:
raise UploadAborted()
if not num_chunks:
return defer.succeed(previous_chunks)
d = self._uploadable.read_encrypted(input_chunk_size, False)
read_size = num_chunks * input_chunk_size
d = self._uploadable.read_encrypted(read_size, hash_only=False)
def _got(data):
assert isinstance(data, (list,tuple))
if self._aborted:
raise UploadAborted()
encrypted_pieces = []
length = 0
while data:
encrypted_piece = data.pop(0)
length += len(encrypted_piece)
crypttext_segment_hasher.update(encrypted_piece)
self._crypttext_hasher.update(encrypted_piece)
encrypted_pieces.append(encrypted_piece)
precondition(length <= input_chunk_size,
"length=%d > input_chunk_size=%d" %
(length, input_chunk_size))
if allow_short:
if length < input_chunk_size:
# padding
pad_size = input_chunk_size - length
encrypted_pieces.append('\x00' * pad_size)
else:
# non-tail segments should be the full segment size
if length != input_chunk_size:
log.msg("non-tail segment should be full segment size: %d!=%d"
% (length, input_chunk_size),
level=log.BAD, umid="jNk5Yw")
precondition(length == input_chunk_size,
"length=%d != input_chunk_size=%d" %
(length, input_chunk_size))
encrypted_piece = "".join(encrypted_pieces)
return previous_chunks + [encrypted_piece]
data = "".join(data)
precondition(len(data) <= read_size, len(data), read_size)
if not allow_short:
precondition(len(data) == read_size, len(data), read_size)
crypttext_segment_hasher.update(data)
self._crypttext_hasher.update(data)
if allow_short and len(data) < read_size:
# padding
data += "\x00" * (read_size - len(data))
encrypted_pieces = [data[i:i+input_chunk_size]
for i in range(0, len(data), input_chunk_size)]
return encrypted_pieces
d.addCallback(_got)
d.addCallback(lambda chunks:
self._gather_data(num_chunks-1, input_chunk_size,
crypttext_segment_hasher,
allow_short, chunks))
return d
def _send_segment(self, (shares, shareids), segnum):

View File

@ -1622,7 +1622,11 @@ class IUploadable(Interface):
If the data must be acquired through multiple internal read
operations, returning a list instead of a single string may help to
reduce string copies.
reduce string copies. However, the length of the concatenated strings
must equal the amount of data requested, unless EOF is encountered.
Long reads, or short reads without EOF, are not allowed. read()
should return the same amount of data as a local disk file read, just
in a different shape and asynchronously.
'length' will typically be equal to (min(get_size(),1MB)/req_shares),
so a 10kB file means length=3kB, 100kB file means length=30kB,

View File

@ -672,6 +672,35 @@ class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
return d
#test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
def test_tiny_reads(self):
# ticket #1223 points out three problems:
# repairer reads beyond end of input file
# new-downloader does not tolerate overreads
# uploader does lots of tiny reads, inefficient
self.basedir = "repairer/Repairer/test_tiny_reads"
self.set_up_grid()
c0 = self.g.clients[0]
DATA = "a"*135
c0.DEFAULT_ENCODING_PARAMETERS['k'] = 22
c0.DEFAULT_ENCODING_PARAMETERS['n'] = 66
d = c0.upload(upload.Data(DATA, convergence=""))
def _then(ur):
self.uri = ur.uri
self.delete_shares_numbered(self.uri, [0])
self.c0_filenode = c0.create_node_from_uri(ur.uri)
self._stash_counts()
return self.c0_filenode.check_and_repair(Monitor())
d.addCallback(_then)
def _check(ign):
(r,a,w) = self._get_delta_counts()
# when the uploader (driven by the repairer) does full-segment
# reads, this makes 44 server read calls (2*k). Before, when it
# was doing input_chunk_size reads (7 bytes), it was doing over
# 400.
self.failIf(r > 100, "too many reads: %d>100" % r)
d.addCallback(_check)
return d
# XXX extend these tests to show that the checker detects which specific
# share on which specific server is broken -- this is necessary so that the