mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-19 13:07:56 +00:00
Mutable repair: use new MODE_REPAIR to query all servers *and* get privkey
This fixes bug #1689. Repair was using MODE_READ to build the servermap, which doesn't try hard enough to grab the privkey, and also doesn't guarantee sending queries to all servers. This patch adds a new MODE_REPAIR which does both, and does a separate, distinct mapupdate to start wth repair cycle, instead of relying upon the (MODE_CHECK) mapupdate leftover from the filecheck that triggered the repair.
This commit is contained in:
parent
2b8a312cb7
commit
5bae4a1bd2
@ -3,11 +3,12 @@ from allmydata.uri import from_string
|
||||
from allmydata.util import base32, log
|
||||
from allmydata.check_results import CheckAndRepairResults, CheckResults
|
||||
|
||||
from allmydata.mutable.common import MODE_CHECK, CorruptShareError
|
||||
from allmydata.mutable.common import MODE_CHECK, MODE_WRITE, CorruptShareError
|
||||
from allmydata.mutable.servermap import ServerMap, ServermapUpdater
|
||||
from allmydata.mutable.retrieve import Retrieve # for verifying
|
||||
|
||||
class MutableChecker:
|
||||
SERVERMAP_MODE = MODE_CHECK
|
||||
|
||||
def __init__(self, node, storage_broker, history, monitor):
|
||||
self._node = node
|
||||
@ -26,7 +27,8 @@ class MutableChecker:
|
||||
# of finding all of the shares, and getting a good idea of
|
||||
# recoverability, etc, without verifying.
|
||||
u = ServermapUpdater(self._node, self._storage_broker, self._monitor,
|
||||
servermap, MODE_CHECK, add_lease=add_lease)
|
||||
servermap, self.SERVERMAP_MODE,
|
||||
add_lease=add_lease)
|
||||
if self._history:
|
||||
self._history.notify_mapupdate(u.get_status())
|
||||
d = u.update()
|
||||
@ -241,6 +243,8 @@ class MutableChecker:
|
||||
|
||||
|
||||
class MutableCheckAndRepairer(MutableChecker):
|
||||
SERVERMAP_MODE = MODE_WRITE # needed to get the privkey
|
||||
|
||||
def __init__(self, node, storage_broker, history, monitor):
|
||||
MutableChecker.__init__(self, node, storage_broker, history, monitor)
|
||||
self.cr_results = CheckAndRepairResults(self._storage_index)
|
||||
@ -264,7 +268,7 @@ class MutableCheckAndRepairer(MutableChecker):
|
||||
self.cr_results.repair_attempted = False
|
||||
return
|
||||
self.cr_results.repair_attempted = True
|
||||
d = self._node.repair(self.results)
|
||||
d = self._node.repair(self.results, monitor=self._monitor)
|
||||
def _repair_finished(repair_results):
|
||||
self.cr_results.repair_successful = repair_results.get_successful()
|
||||
r = CheckResults(from_string(self._node.get_uri()), self._storage_index)
|
||||
|
@ -6,6 +6,7 @@ MODE_ANYTHING = "MODE_ANYTHING" # one recoverable version
|
||||
MODE_WRITE = "MODE_WRITE" # replace all shares, probably.. not for initial
|
||||
# creation
|
||||
MODE_READ = "MODE_READ"
|
||||
MODE_REPAIR = "MODE_REPAIR" # query all peers, get the privkey
|
||||
|
||||
class NotWriteableError(Exception):
|
||||
pass
|
||||
|
@ -308,9 +308,10 @@ class MutableFileNode:
|
||||
#################################
|
||||
# IRepairable
|
||||
|
||||
def repair(self, check_results, force=False):
|
||||
def repair(self, check_results, force=False, monitor=None):
|
||||
assert ICheckResults(check_results)
|
||||
r = Repairer(self, check_results)
|
||||
r = Repairer(self, check_results, self._storage_broker,
|
||||
self._history, monitor)
|
||||
d = r.start(force)
|
||||
return d
|
||||
|
||||
|
@ -15,7 +15,7 @@ from allmydata.storage.server import si_b2a
|
||||
from pycryptopp.cipher.aes import AES
|
||||
from foolscap.api import eventually, fireEventually
|
||||
|
||||
from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
|
||||
from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, MODE_REPAIR, \
|
||||
UncoordinatedWriteError, NotEnoughServersError
|
||||
from allmydata.mutable.servermap import ServerMap
|
||||
from allmydata.mutable.layout import get_version_from_checkstring,\
|
||||
@ -187,7 +187,7 @@ class Publish:
|
||||
# servermap was updated in MODE_WRITE, so we can depend upon the
|
||||
# serverlist computed by that process instead of computing our own.
|
||||
assert self._servermap
|
||||
assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK)
|
||||
assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
|
||||
# we will push a version that is one larger than anything present
|
||||
# in the grid, according to the servermap.
|
||||
self._new_seqnum = self._servermap.highest_seqnum() + 1
|
||||
@ -373,7 +373,7 @@ class Publish:
|
||||
# servermap was updated in MODE_WRITE, so we can depend upon the
|
||||
# serverlist computed by that process instead of computing our own.
|
||||
if self._servermap:
|
||||
assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK)
|
||||
assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
|
||||
# we will push a version that is one larger than anything present
|
||||
# in the grid, according to the servermap.
|
||||
self._new_seqnum = self._servermap.highest_seqnum() + 1
|
||||
|
@ -3,6 +3,8 @@ from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from allmydata.interfaces import IRepairResults, ICheckResults
|
||||
from allmydata.mutable.publish import MutableData
|
||||
from allmydata.mutable.common import MODE_REPAIR
|
||||
from allmydata.mutable.servermap import ServerMap, ServermapUpdater
|
||||
|
||||
class RepairResults:
|
||||
implements(IRepairResults)
|
||||
@ -23,10 +25,13 @@ class MustForceRepairError(Exception):
|
||||
pass
|
||||
|
||||
class Repairer:
|
||||
def __init__(self, node, check_results):
|
||||
def __init__(self, node, check_results, storage_broker, history, monitor):
|
||||
self.node = node
|
||||
self.check_results = ICheckResults(check_results)
|
||||
assert check_results.storage_index == self.node.get_storage_index()
|
||||
self._storage_broker = storage_broker
|
||||
self._history = history
|
||||
self._monitor = monitor
|
||||
|
||||
def start(self, force=False):
|
||||
# download, then re-publish. If a server had a bad share, try to
|
||||
@ -55,8 +60,17 @@ class Repairer:
|
||||
# old shares: replace old shares with the latest version
|
||||
# bogus shares (bad sigs): replace the bad one with a good one
|
||||
|
||||
smap = self.check_results.get_servermap()
|
||||
# first, update the servermap in MODE_REPAIR, which files all shares
|
||||
# and makes sure we get the privkey.
|
||||
u = ServermapUpdater(self.node, self._storage_broker, self._monitor,
|
||||
ServerMap(), MODE_REPAIR)
|
||||
if self._history:
|
||||
self._history.notify_mapupdate(u.get_status())
|
||||
d = u.update()
|
||||
d.addCallback(self._got_full_servermap, force)
|
||||
return d
|
||||
|
||||
def _got_full_servermap(self, smap, force):
|
||||
best_version = smap.best_recoverable_version()
|
||||
if not best_version:
|
||||
# the file is damaged beyond repair
|
||||
|
@ -12,8 +12,8 @@ from allmydata.storage.server import si_b2a
|
||||
from allmydata.interfaces import IServermapUpdaterStatus
|
||||
from pycryptopp.publickey import rsa
|
||||
|
||||
from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
|
||||
CorruptShareError
|
||||
from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, \
|
||||
MODE_READ, MODE_REPAIR, CorruptShareError
|
||||
from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
|
||||
|
||||
class UpdateStatus:
|
||||
@ -426,7 +426,7 @@ class ServermapUpdater:
|
||||
self._read_size = 1000
|
||||
self._need_privkey = False
|
||||
|
||||
if mode == MODE_WRITE and not self._node.get_privkey():
|
||||
if mode in (MODE_WRITE, MODE_REPAIR) and not self._node.get_privkey():
|
||||
self._need_privkey = True
|
||||
# check+repair: repair requires the privkey, so if we didn't happen
|
||||
# to ask for it during the check, we'll have problems doing the
|
||||
@ -497,7 +497,7 @@ class ServermapUpdater:
|
||||
# might not wait for all of their answers to come back)
|
||||
self.num_servers_to_query = k + self.EPSILON
|
||||
|
||||
if self.mode == MODE_CHECK:
|
||||
if self.mode in (MODE_CHECK, MODE_REPAIR):
|
||||
# We want to query all of the servers.
|
||||
initial_servers_to_query = list(full_serverlist)
|
||||
must_query = set(initial_servers_to_query)
|
||||
@ -1063,7 +1063,7 @@ class ServermapUpdater:
|
||||
parent=lp)
|
||||
return self._done()
|
||||
|
||||
if self.mode == MODE_CHECK:
|
||||
if self.mode == (MODE_CHECK, MODE_REPAIR):
|
||||
# we used self._must_query, and we know there aren't any
|
||||
# responses still waiting, so that means we must be done
|
||||
self.log("done", parent=lp)
|
||||
|
Loading…
Reference in New Issue
Block a user