mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-27 22:39:41 +00:00
upload: abort the bucket upon any write error, and do it with callRemoteOnly to avoid double errors
This commit is contained in:
parent
814922a9a1
commit
bc04b8528a
@ -251,7 +251,7 @@ class Encoder(object):
|
|||||||
d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
|
d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
|
||||||
|
|
||||||
d.addCallback(lambda res: self.close_all_shareholders())
|
d.addCallback(lambda res: self.close_all_shareholders())
|
||||||
d.addCallbacks(lambda res: self.done(), self.err)
|
d.addCallbacks(self.done, self.err)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def set_status(self, status):
|
def set_status(self, status):
|
||||||
@ -481,6 +481,7 @@ class Encoder(object):
|
|||||||
method=where, shnum=shareid,
|
method=where, shnum=shareid,
|
||||||
level=log.UNUSUAL, failure=why)
|
level=log.UNUSUAL, failure=why)
|
||||||
if shareid in self.landlords:
|
if shareid in self.landlords:
|
||||||
|
self.landlords[shareid].abort()
|
||||||
del self.landlords[shareid]
|
del self.landlords[shareid]
|
||||||
else:
|
else:
|
||||||
# even more UNUSUAL
|
# even more UNUSUAL
|
||||||
@ -678,7 +679,7 @@ class Encoder(object):
|
|||||||
dl.append(d)
|
dl.append(d)
|
||||||
return self._gather_responses(dl)
|
return self._gather_responses(dl)
|
||||||
|
|
||||||
def done(self):
|
def done(self, res):
|
||||||
self.log("upload done", level=log.OPERATIONAL)
|
self.log("upload done", level=log.OPERATIONAL)
|
||||||
self.set_status("Done")
|
self.set_status("Done")
|
||||||
self.set_encode_and_push_progress(extra=1.0) # done
|
self.set_encode_and_push_progress(extra=1.0) # done
|
||||||
@ -699,19 +700,11 @@ class Encoder(object):
|
|||||||
# we need to abort any remaining shareholders, so they'll delete the
|
# we need to abort any remaining shareholders, so they'll delete the
|
||||||
# partial share, allowing someone else to upload it again.
|
# partial share, allowing someone else to upload it again.
|
||||||
self.log("aborting shareholders", level=log.UNUSUAL)
|
self.log("aborting shareholders", level=log.UNUSUAL)
|
||||||
dl = []
|
|
||||||
for shareid in list(self.landlords.keys()):
|
for shareid in list(self.landlords.keys()):
|
||||||
d = self.landlords[shareid].abort()
|
self.landlords[shareid].abort()
|
||||||
d.addErrback(self._remove_shareholder, shareid, "abort")
|
|
||||||
dl.append(d)
|
|
||||||
d = self._gather_responses(dl)
|
|
||||||
def _done(res):
|
|
||||||
self.log("shareholders aborted", level=log.UNUSUAL)
|
|
||||||
if f.check(defer.FirstError):
|
if f.check(defer.FirstError):
|
||||||
return f.value.subFailure
|
return f.value.subFailure
|
||||||
return f
|
return f
|
||||||
d.addCallback(_done)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def get_shares_placed(self):
|
def get_shares_placed(self):
|
||||||
# return a set of share numbers that were successfully placed.
|
# return a set of share numbers that were successfully placed.
|
||||||
|
@ -1215,7 +1215,7 @@ class WriteBucketProxy:
|
|||||||
return self._rref.callRemote("close")
|
return self._rref.callRemote("close")
|
||||||
|
|
||||||
def abort(self):
|
def abort(self):
|
||||||
return self._rref.callRemote("abort")
|
return self._rref.callRemoteOnly("abort")
|
||||||
|
|
||||||
class ReadBucketProxy:
|
class ReadBucketProxy:
|
||||||
implements(IStorageBucketReader)
|
implements(IStorageBucketReader)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user