MDMFSlotReadProxy: remove the queue

This is a neat trick to reduce Foolscap overhead, but the need for an
explicit flush() complicates the Retrieve path and makes it prone to
lost-progress bugs.

Also change test_mutable.FakeStorageServer to tolerate multiple reads of the
same share in a row, a limitation exposed by turning off the queue.
This commit is contained in:
Brian Warner 2011-09-05 12:04:08 -07:00
parent 1597aafea1
commit 2b4f2b7fa3
5 changed files with 30 additions and 104 deletions

View File

@ -3,7 +3,7 @@ import struct
from allmydata.mutable.common import NeedMoreDataError, UnknownVersionError from allmydata.mutable.common import NeedMoreDataError, UnknownVersionError
from allmydata.interfaces import HASH_SIZE, SALT_SIZE, SDMF_VERSION, \ from allmydata.interfaces import HASH_SIZE, SALT_SIZE, SDMF_VERSION, \
MDMF_VERSION, IMutableSlotWriter MDMF_VERSION, IMutableSlotWriter
from allmydata.util import mathutil, observer from allmydata.util import mathutil
from twisted.python import failure from twisted.python import failure
from twisted.internet import defer from twisted.internet import defer
from zope.interface import implements from zope.interface import implements
@ -1212,10 +1212,6 @@ class MDMFSlotReadProxy:
if self._data == None: if self._data == None:
self._data = "" self._data = ""
self._queue_observers = observer.ObserverList()
self._queue_errbacks = observer.ObserverList()
self._readvs = []
def _maybe_fetch_offsets_and_header(self, force_remote=False): def _maybe_fetch_offsets_and_header(self, force_remote=False):
""" """
@ -1353,7 +1349,7 @@ class MDMFSlotReadProxy:
self._offsets['share_data'] = sharedata self._offsets['share_data'] = sharedata
def get_block_and_salt(self, segnum, queue=False): def get_block_and_salt(self, segnum):
""" """
I return (block, salt), where block is the block data and I return (block, salt), where block is the block data and
salt is the salt used to encrypt that segment. salt is the salt used to encrypt that segment.
@ -1381,8 +1377,7 @@ class MDMFSlotReadProxy:
readvs = [(share_offset, data)] readvs = [(share_offset, data)]
return readvs return readvs
d.addCallback(_then) d.addCallback(_then)
d.addCallback(lambda readvs: d.addCallback(lambda readvs: self._read(readvs))
self._read(readvs, queue=queue))
def _process_results(results): def _process_results(results):
assert self.shnum in results assert self.shnum in results
if self._version_number == 0: if self._version_number == 0:
@ -1408,7 +1403,7 @@ class MDMFSlotReadProxy:
return d return d
def get_blockhashes(self, needed=None, queue=False, force_remote=False): def get_blockhashes(self, needed=None, force_remote=False):
""" """
I return the block hash tree I return the block hash tree
@ -1440,7 +1435,7 @@ class MDMFSlotReadProxy:
return readvs return readvs
d.addCallback(_then) d.addCallback(_then)
d.addCallback(lambda readvs: d.addCallback(lambda readvs:
self._read(readvs, queue=queue, force_remote=force_remote)) self._read(readvs, force_remote=force_remote))
def _build_block_hash_tree(results): def _build_block_hash_tree(results):
assert self.shnum in results assert self.shnum in results
@ -1452,7 +1447,7 @@ class MDMFSlotReadProxy:
return d return d
def get_sharehashes(self, needed=None, queue=False, force_remote=False): def get_sharehashes(self, needed=None, force_remote=False):
""" """
I return the part of the share hash chain placed to validate I return the part of the share hash chain placed to validate
this share. this share.
@ -1479,7 +1474,7 @@ class MDMFSlotReadProxy:
return readvs return readvs
d.addCallback(_make_readvs) d.addCallback(_make_readvs)
d.addCallback(lambda readvs: d.addCallback(lambda readvs:
self._read(readvs, queue=queue, force_remote=force_remote)) self._read(readvs, force_remote=force_remote))
def _build_share_hash_chain(results): def _build_share_hash_chain(results):
assert self.shnum in results assert self.shnum in results
@ -1493,7 +1488,7 @@ class MDMFSlotReadProxy:
return d return d
def get_encprivkey(self, queue=False): def get_encprivkey(self):
""" """
I return the encrypted private key. I return the encrypted private key.
""" """
@ -1508,8 +1503,7 @@ class MDMFSlotReadProxy:
readvs = [(privkey_offset, privkey_length)] readvs = [(privkey_offset, privkey_length)]
return readvs return readvs
d.addCallback(_make_readvs) d.addCallback(_make_readvs)
d.addCallback(lambda readvs: d.addCallback(lambda readvs: self._read(readvs))
self._read(readvs, queue=queue))
def _process_results(results): def _process_results(results):
assert self.shnum in results assert self.shnum in results
privkey = results[self.shnum][0] privkey = results[self.shnum][0]
@ -1518,7 +1512,7 @@ class MDMFSlotReadProxy:
return d return d
def get_signature(self, queue=False): def get_signature(self):
""" """
I return the signature of my share. I return the signature of my share.
""" """
@ -1533,8 +1527,7 @@ class MDMFSlotReadProxy:
readvs = [(signature_offset, signature_length)] readvs = [(signature_offset, signature_length)]
return readvs return readvs
d.addCallback(_make_readvs) d.addCallback(_make_readvs)
d.addCallback(lambda readvs: d.addCallback(lambda readvs: self._read(readvs))
self._read(readvs, queue=queue))
def _process_results(results): def _process_results(results):
assert self.shnum in results assert self.shnum in results
signature = results[self.shnum][0] signature = results[self.shnum][0]
@ -1543,7 +1536,7 @@ class MDMFSlotReadProxy:
return d return d
def get_verification_key(self, queue=False): def get_verification_key(self):
""" """
I return the verification key. I return the verification key.
""" """
@ -1559,8 +1552,7 @@ class MDMFSlotReadProxy:
readvs = [(vk_offset, vk_length)] readvs = [(vk_offset, vk_length)]
return readvs return readvs
d.addCallback(_make_readvs) d.addCallback(_make_readvs)
d.addCallback(lambda readvs: d.addCallback(lambda readvs: self._read(readvs))
self._read(readvs, queue=queue))
def _process_results(results): def _process_results(results):
assert self.shnum in results assert self.shnum in results
verification_key = results[self.shnum][0] verification_key = results[self.shnum][0]
@ -1712,23 +1704,7 @@ class MDMFSlotReadProxy:
return d return d
def flush(self): def _read(self, readvs, force_remote=False):
"""
I flush my queue of read vectors.
"""
d = self._read(self._readvs)
def _then(results):
self._readvs = []
if isinstance(results, failure.Failure):
self._queue_errbacks.notify(results)
else:
self._queue_observers.notify(results)
self._queue_observers = observer.ObserverList()
self._queue_errbacks = observer.ObserverList()
d.addBoth(_then)
def _read(self, readvs, force_remote=False, queue=False):
unsatisfiable = filter(lambda x: x[0] + x[1] > len(self._data), readvs) unsatisfiable = filter(lambda x: x[0] + x[1] > len(self._data), readvs)
# TODO: It's entirely possible to tweak this so that it just # TODO: It's entirely possible to tweak this so that it just
# fulfills the requests that it can, and not demand that all # fulfills the requests that it can, and not demand that all
@ -1739,19 +1715,6 @@ class MDMFSlotReadProxy:
results = {self.shnum: results} results = {self.shnum: results}
return defer.succeed(results) return defer.succeed(results)
else: else:
if queue:
start = len(self._readvs)
self._readvs += readvs
end = len(self._readvs)
def _get_results(results, start, end):
if not self.shnum in results:
return {self._shnum: [""]}
return {self.shnum: results[self.shnum][start:end]}
d = defer.Deferred()
d.addCallback(_get_results, start, end)
self._queue_observers.subscribe(d.callback)
self._queue_errbacks.subscribe(d.errback)
return d
return self._rref.callRemote("slot_readv", return self._rref.callRemote("slot_readv",
self._storage_index, self._storage_index,
[self.shnum], [self.shnum],

View File

@ -700,13 +700,12 @@ class Retrieve:
ds = [] ds = []
for reader in self._active_readers: for reader in self._active_readers:
started = time.time() started = time.time()
d = reader.get_block_and_salt(segnum, queue=True) d = reader.get_block_and_salt(segnum)
d2 = self._get_needed_hashes(reader, segnum) d2 = self._get_needed_hashes(reader, segnum)
dl = defer.DeferredList([d, d2], consumeErrors=True) dl = defer.DeferredList([d, d2], consumeErrors=True)
dl.addCallback(self._validate_block, segnum, reader, started) dl.addCallback(self._validate_block, segnum, reader, started)
dl.addErrback(self._validation_or_decoding_failed, [reader]) dl.addErrback(self._validation_or_decoding_failed, [reader])
ds.append(dl) ds.append(dl)
reader.flush()
dl = defer.DeferredList(ds) dl = defer.DeferredList(ds)
if self._verify: if self._verify:
dl.addCallback(lambda ignored: "") dl.addCallback(lambda ignored: "")
@ -910,12 +909,12 @@ class Retrieve:
#needed.discard(0) #needed.discard(0)
self.log("getting blockhashes for segment %d, share %d: %s" % \ self.log("getting blockhashes for segment %d, share %d: %s" % \
(segnum, reader.shnum, str(needed))) (segnum, reader.shnum, str(needed)))
d1 = reader.get_blockhashes(needed, queue=True, force_remote=True) d1 = reader.get_blockhashes(needed, force_remote=True)
if self.share_hash_tree.needed_hashes(reader.shnum): if self.share_hash_tree.needed_hashes(reader.shnum):
need = self.share_hash_tree.needed_hashes(reader.shnum) need = self.share_hash_tree.needed_hashes(reader.shnum)
self.log("also need sharehashes for share %d: %s" % (reader.shnum, self.log("also need sharehashes for share %d: %s" % (reader.shnum,
str(need))) str(need)))
d2 = reader.get_sharehashes(need, queue=True, force_remote=True) d2 = reader.get_sharehashes(need, force_remote=True)
else: else:
d2 = defer.succeed({}) # the logic in the next method d2 = defer.succeed({}) # the logic in the next method
# expects a dict # expects a dict

View File

@ -676,7 +676,7 @@ class ServermapUpdater:
# public key. We use this to validate the signature. # public key. We use this to validate the signature.
if not self._node.get_pubkey(): if not self._node.get_pubkey():
# fetch and set the public key. # fetch and set the public key.
d = reader.get_verification_key(queue=True) d = reader.get_verification_key()
d.addCallback(lambda results, shnum=shnum, peerid=peerid: d.addCallback(lambda results, shnum=shnum, peerid=peerid:
self._try_to_set_pubkey(results, peerid, shnum, lp)) self._try_to_set_pubkey(results, peerid, shnum, lp))
# XXX: Make self._pubkey_query_failed? # XXX: Make self._pubkey_query_failed?
@ -702,7 +702,7 @@ class ServermapUpdater:
# to get the version information. In MDMF, this lives at # to get the version information. In MDMF, this lives at
# the end of the share, so unless the file is quite small, # the end of the share, so unless the file is quite small,
# we'll need to do a remote fetch to get it. # we'll need to do a remote fetch to get it.
d3 = reader.get_signature(queue=True) d3 = reader.get_signature()
d3.addErrback(lambda error, shnum=shnum, peerid=peerid: d3.addErrback(lambda error, shnum=shnum, peerid=peerid:
self._got_corrupt_share(error, shnum, peerid, data, lp)) self._got_corrupt_share(error, shnum, peerid, data, lp))
# Once we have all three of these responses, we can move on # Once we have all three of these responses, we can move on
@ -711,7 +711,7 @@ class ServermapUpdater:
# Does the node already have a privkey? If not, we'll try to # Does the node already have a privkey? If not, we'll try to
# fetch it here. # fetch it here.
if self._need_privkey: if self._need_privkey:
d4 = reader.get_encprivkey(queue=True) d4 = reader.get_encprivkey()
d4.addCallback(lambda results, shnum=shnum, peerid=peerid: d4.addCallback(lambda results, shnum=shnum, peerid=peerid:
self._try_to_validate_privkey(results, peerid, shnum, lp)) self._try_to_validate_privkey(results, peerid, shnum, lp))
d4.addErrback(lambda error, shnum=shnum, peerid=peerid: d4.addErrback(lambda error, shnum=shnum, peerid=peerid:
@ -730,11 +730,9 @@ class ServermapUpdater:
# make the two routines share the value without # make the two routines share the value without
# introducing more roundtrips? # introducing more roundtrips?
ds.append(reader.get_verinfo()) ds.append(reader.get_verinfo())
ds.append(reader.get_blockhashes(queue=True)) ds.append(reader.get_blockhashes())
ds.append(reader.get_block_and_salt(self.start_segment, ds.append(reader.get_block_and_salt(self.start_segment))
queue=True)) ds.append(reader.get_block_and_salt(self.end_segment))
ds.append(reader.get_block_and_salt(self.end_segment,
queue=True))
d5 = deferredutil.gatherResults(ds) d5 = deferredutil.gatherResults(ds)
d5.addCallback(self._got_update_results_one_share, shnum) d5.addCallback(self._got_update_results_one_share, shnum)
else: else:
@ -742,7 +740,6 @@ class ServermapUpdater:
dl = defer.DeferredList([d, d2, d3, d4, d5]) dl = defer.DeferredList([d, d2, d3, d4, d5])
dl.addBoth(self._turn_barrier) dl.addBoth(self._turn_barrier)
reader.flush()
dl.addCallback(lambda results, shnum=shnum, peerid=peerid: dl.addCallback(lambda results, shnum=shnum, peerid=peerid:
self._got_signature_one_share(results, shnum, peerid, lp)) self._got_signature_one_share(results, shnum, peerid, lp))
dl.addErrback(lambda error, shnum=shnum, data=data: dl.addErrback(lambda error, shnum=shnum, data=data:

View File

@ -72,7 +72,9 @@ class FakeStorage:
d = defer.Deferred() d = defer.Deferred()
if not self._pending: if not self._pending:
self._pending_timer = reactor.callLater(1.0, self._fire_readers) self._pending_timer = reactor.callLater(1.0, self._fire_readers)
self._pending[peerid] = (d, shares) if peerid not in self._pending:
self._pending[peerid] = []
self._pending[peerid].append( (d, shares) )
return d return d
def _fire_readers(self): def _fire_readers(self):
@ -81,9 +83,10 @@ class FakeStorage:
self._pending = {} self._pending = {}
for peerid in self._sequence: for peerid in self._sequence:
if peerid in pending: if peerid in pending:
d, shares = pending.pop(peerid) for (d, shares) in pending.pop(peerid):
eventually(d.callback, shares) eventually(d.callback, shares)
for (d, shares) in pending.values(): for peerid in pending:
for (d, shares) in pending[peerid]:
eventually(d.callback, shares) eventually(d.callback, shares)
def write(self, peerid, storage_index, shnum, offset, data): def write(self, peerid, storage_index, shnum, offset, data):

View File

@ -2624,42 +2624,6 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
return d return d
def test_reader_queue(self):
self.write_test_share_to_server('si1')
mr = MDMFSlotReadProxy(self.rref, "si1", 0)
d1 = mr.get_block_and_salt(0, queue=True)
d2 = mr.get_blockhashes(queue=True)
d3 = mr.get_sharehashes(queue=True)
d4 = mr.get_signature(queue=True)
d5 = mr.get_verification_key(queue=True)
dl = defer.DeferredList([d1, d2, d3, d4, d5])
mr.flush()
def _print(results):
self.failUnlessEqual(len(results), 5)
# We have one read for version information and offsets, and
# one for everything else.
self.failUnlessEqual(self.rref.read_count, 2)
block, salt = results[0][1] # results[0] is a boolean that says
# whether or not the operation
# worked.
self.failUnlessEqual(self.block, block)
self.failUnlessEqual(self.salt, salt)
blockhashes = results[1][1]
self.failUnlessEqual(self.block_hash_tree, blockhashes)
sharehashes = results[2][1]
self.failUnlessEqual(self.share_hash_chain, sharehashes)
signature = results[3][1]
self.failUnlessEqual(self.signature, signature)
verification_key = results[4][1]
self.failUnlessEqual(self.verification_key, verification_key)
dl.addCallback(_print)
return dl
def test_sdmf_writer(self): def test_sdmf_writer(self):
# Go through the motions of writing an SDMF share to the storage # Go through the motions of writing an SDMF share to the storage
# server. Then read the storage server to see that the share got # server. Then read the storage server to see that the share got