encode.py: clean up handling of lost peers during upload, add some logging

This commit is contained in:
Brian Warner 2007-06-06 12:40:16 -07:00
parent daa4c32381
commit f4c048bbeb

@ -254,7 +254,7 @@ class Encoder(object):
dl.append(d)
subshare_hash = block_hash(subshare)
self.subshare_hashes[shareid].append(subshare_hash)
dl = defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
dl = self._gather_responses(dl)
def _logit(res):
log.msg("%s uploaded %s / %s bytes of your file." % (self, self.segment_size*(segnum+1), self.segment_size*self.num_segments))
return res
@ -273,10 +273,32 @@ class Encoder(object):
def _remove_shareholder(self, why, shareid, where):
log.msg("error while sending %s to shareholder=%d: %s" %
(where, shareid, why)) # UNUSUAL
del self.landlords[shareid]
if shareid in self.landlords:
del self.landlords[shareid]
else:
# even more UNUSUAL
log.msg(" weird, they weren't in our list of landlords")
if len(self.landlords) < self.shares_of_happiness:
msg = "lost too many shareholders during upload"
raise NotEnoughPeersError(msg)
log.msg("but we can still continue with %s shares, we'll be happy "
"with at least %s" % (len(self.landlords),
self.shares_of_happiness))
def _gather_responses(self, dl):
d = defer.DeferredList(dl, fireOnOneErrback=True)
def _eatNotEnoughPeersError(f):
# all exceptions that occur while talking to a peer are handled
# in _remove_shareholder. That might raise NotEnoughPeersError,
# which will cause the DeferredList to errback but which should
# otherwise be consumed. Allow non-NotEnoughPeersError exceptions
# to pass through as an unhandled errback. We use this in lieu of
# consumeErrors=True to allow coding errors to be logged.
f.trap(NotEnoughPeersError)
return None
for d0 in dl:
d0.addErrback(_eatNotEnoughPeersError)
return d
def send_all_subshare_hash_trees(self):
log.msg("%s sending subshare hash trees" % self)
@ -285,8 +307,7 @@ class Encoder(object):
# hashes is a list of the hashes of all subshares that were sent
# to shareholder[shareid].
dl.append(self.send_one_subshare_hash_tree(shareid, hashes))
return defer.DeferredList(dl, fireOnOneErrback=True,
consumeErrors=True)
return self._gather_responses(dl)
def send_one_subshare_hash_tree(self, shareid, subshare_hashes):
t = HashTree(subshare_hashes)
@ -322,8 +343,7 @@ class Encoder(object):
needed_hash_indices = t.needed_hashes(i, include_leaf=True)
hashes = [(hi, t[hi]) for hi in needed_hash_indices]
dl.append(self.send_one_share_hash_tree(i, hashes))
return defer.DeferredList(dl, fireOnOneErrback=True,
consumeErrors=True)
return self._gather_responses(dl)
def send_one_share_hash_tree(self, shareid, needed_hashes):
if shareid not in self.landlords:
@ -340,8 +360,7 @@ class Encoder(object):
dl = []
for shareid in self.landlords.keys():
dl.append(self.send_thingA(shareid, thingA))
return defer.DeferredList(dl, fireOnOneErrback=True,
consumeErrors=True)
return self._gather_responses(dl)
def send_thingA(self, shareid, thingA):
sh = self.landlords[shareid]
@ -356,8 +375,7 @@ class Encoder(object):
d = self.landlords[shareid].callRemote("close")
d.addErrback(self._remove_shareholder, shareid, "close")
dl.append(d)
return defer.DeferredList(dl, fireOnOneErrback=True,
consumeErrors=True)
return self._gather_responses(dl)
def done(self):
log.msg("%s: upload done" % self)