mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-23 14:52:26 +00:00
UploadResults: store IServers internally, but still return serverids
This stores IDisplayableServer-providing instances (StubServers or NativeStorageServers) in the .servermap and .sharemap dictionaries. But get_servermap()/get_sharemap() still return data structures with serverids, not IServers, by translating their data on the way out. This lets us put off changing the callers for a little bit longer.
This commit is contained in:
parent
97a1eb6ebf
commit
843739486a
@ -64,8 +64,8 @@ class UploadResults:
|
||||
ciphertext_fetched, # how much the helper fetched
|
||||
preexisting_shares, # count of shares already present
|
||||
pushed_shares, # count of shares we pushed
|
||||
sharemap, # {shnum: set(serverid)}
|
||||
servermap, # {serverid: set(shnum)}
|
||||
sharemap, # {shnum: set(server)}
|
||||
servermap, # {server: set(shnum)}
|
||||
timings, # dict of name to number of seconds
|
||||
uri_extension_data,
|
||||
uri_extension_hash,
|
||||
@ -95,9 +95,17 @@ class UploadResults:
|
||||
def get_pushed_shares(self):
|
||||
return self._pushed_shares
|
||||
def get_sharemap(self):
|
||||
return self._sharemap
|
||||
# returns {shnum: set(serverid)}
|
||||
sharemap = {}
|
||||
for shnum, servers in self._sharemap.items():
|
||||
sharemap[shnum] = set([s.get_serverid() for s in servers])
|
||||
return sharemap
|
||||
def get_servermap(self):
|
||||
return self._servermap
|
||||
# returns {serverid: set(shnum)}
|
||||
servermap = {}
|
||||
for server, shnums in self._servermap.items():
|
||||
servermap[server.get_serverid()] = set(shnums)
|
||||
return servermap
|
||||
def get_timings(self):
|
||||
return self._timings
|
||||
def get_uri_extension_data(self):
|
||||
@ -144,6 +152,8 @@ class ServerTracker:
|
||||
return ("<ServerTracker for server %s and SI %s>"
|
||||
% (self._server.get_name(), si_b2a(self.storage_index)[:5]))
|
||||
|
||||
def get_server(self):
|
||||
return self._server
|
||||
def get_serverid(self):
|
||||
return self._server.get_serverid()
|
||||
def get_name(self):
|
||||
@ -1025,10 +1035,9 @@ class CHKUploader:
|
||||
sharemap = dictutil.DictOfSets()
|
||||
servermap = dictutil.DictOfSets()
|
||||
for shnum in e.get_shares_placed():
|
||||
server_tracker = self._server_trackers[shnum]
|
||||
serverid = server_tracker.get_serverid()
|
||||
sharemap.add(shnum, serverid)
|
||||
servermap.add(serverid, shnum)
|
||||
server = self._server_trackers[shnum].get_server()
|
||||
sharemap.add(shnum, server)
|
||||
servermap.add(server, shnum)
|
||||
now = time.time()
|
||||
timings = {}
|
||||
timings["total"] = now - self._started
|
||||
@ -1187,8 +1196,9 @@ class RemoteEncryptedUploadable(Referenceable):
|
||||
|
||||
class AssistedUploader:
|
||||
|
||||
def __init__(self, helper):
|
||||
def __init__(self, helper, storage_broker):
|
||||
self._helper = helper
|
||||
self._storage_broker = storage_broker
|
||||
self._log_number = log.msg("AssistedUploader starting")
|
||||
self._storage_index = None
|
||||
self._upload_status = s = UploadStatus()
|
||||
@ -1307,13 +1317,22 @@ class AssistedUploader:
|
||||
now = time.time()
|
||||
timings["total"] = now - self._started
|
||||
|
||||
gss = self._storage_broker.get_stub_server
|
||||
sharemap = {}
|
||||
servermap = {}
|
||||
for shnum, serverids in hur.sharemap.items():
|
||||
sharemap[shnum] = set([gss(serverid) for serverid in serverids])
|
||||
# if the file was already in the grid, hur.servermap is an empty dict
|
||||
for serverid, shnums in hur.servermap.items():
|
||||
servermap[gss(serverid)] = set(shnums)
|
||||
|
||||
ur = UploadResults(file_size=self._size,
|
||||
# not if already found
|
||||
ciphertext_fetched=hur.ciphertext_fetched,
|
||||
preexisting_shares=hur.preexisting_shares,
|
||||
pushed_shares=hur.pushed_shares,
|
||||
sharemap=hur.sharemap,
|
||||
servermap=hur.servermap, # not if already found
|
||||
sharemap=sharemap,
|
||||
servermap=servermap,
|
||||
timings=timings,
|
||||
uri_extension_data=hur.uri_extension_data,
|
||||
uri_extension_hash=hur.uri_extension_hash,
|
||||
@ -1554,8 +1573,9 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
|
||||
else:
|
||||
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
|
||||
d2 = defer.succeed(None)
|
||||
storage_broker = self.parent.get_storage_broker()
|
||||
if self._helper:
|
||||
uploader = AssistedUploader(self._helper)
|
||||
uploader = AssistedUploader(self._helper, storage_broker)
|
||||
d2.addCallback(lambda x: eu.get_storage_index())
|
||||
d2.addCallback(lambda si: uploader.start(eu, si))
|
||||
else:
|
||||
|
@ -89,6 +89,8 @@ class FakeClient(service.MultiService):
|
||||
|
||||
def get_encoding_parameters(self):
|
||||
return self.DEFAULT_ENCODING_PARAMETERS
|
||||
def get_storage_broker(self):
|
||||
return self.storage_broker
|
||||
|
||||
def flush_but_dont_ignore(res):
|
||||
d = flushEventualQueue()
|
||||
@ -114,8 +116,8 @@ class AssistedUpload(unittest.TestCase):
|
||||
timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
|
||||
def setUp(self):
|
||||
self.s = FakeClient()
|
||||
self.storage_broker = StorageFarmBroker(None, True)
|
||||
self.secret_holder = client.SecretHolder("lease secret", "convergence")
|
||||
self.s.storage_broker = StorageFarmBroker(None, True)
|
||||
self.s.secret_holder = client.SecretHolder("lease secret", "converge")
|
||||
self.s.startService()
|
||||
|
||||
self.tub = t = Tub()
|
||||
@ -129,8 +131,8 @@ class AssistedUpload(unittest.TestCase):
|
||||
def setUpHelper(self, basedir, helper_class=Helper_fake_upload):
|
||||
fileutil.make_dirs(basedir)
|
||||
self.helper = h = helper_class(basedir,
|
||||
self.storage_broker,
|
||||
self.secret_holder,
|
||||
self.s.storage_broker,
|
||||
self.s.secret_holder,
|
||||
None, None)
|
||||
self.helper_furl = self.tub.registerReference(h)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user