mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-19 00:06:57 +00:00
immutable/downloader/finder.py: reduce use of get_serverid(), one left
This commit is contained in:
parent
d9d55ad006
commit
baf1506b9b
@ -2,7 +2,7 @@
|
||||
import time
|
||||
now = time.time
|
||||
from foolscap.api import eventually
|
||||
from allmydata.util import base32, log, idlib
|
||||
from allmydata.util import base32, log
|
||||
from twisted.internet import reactor
|
||||
|
||||
from share import Share, CommonShare
|
||||
@ -21,8 +21,8 @@ def incidentally(res, f, *args, **kwargs):
|
||||
return res
|
||||
|
||||
class RequestToken:
|
||||
def __init__(self, peerid):
|
||||
self.peerid = peerid
|
||||
def __init__(self, server):
|
||||
self.server = server
|
||||
|
||||
class ShareFinder:
|
||||
OVERDUE_TIMEOUT = 10.0
|
||||
@ -62,8 +62,7 @@ class ShareFinder:
|
||||
# test_dirnode, which creates us with storage_broker=None
|
||||
if not self._started:
|
||||
si = self.verifycap.storage_index
|
||||
servers = [(s.get_serverid(), s.get_rref())
|
||||
for s in self._storage_broker.get_servers_for_psi(si)]
|
||||
servers = self._storage_broker.get_servers_for_psi(si)
|
||||
self._servers = iter(servers)
|
||||
self._started = True
|
||||
|
||||
@ -88,7 +87,7 @@ class ShareFinder:
|
||||
|
||||
# internal methods
|
||||
def loop(self):
|
||||
pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid)
|
||||
pending_s = ",".join([rt.server.name()
|
||||
for rt in self.pending_requests]) # sort?
|
||||
self.log(format="ShareFinder loop: running=%(running)s"
|
||||
" hungry=%(hungry)s, pending=%(pending)s",
|
||||
@ -131,23 +130,21 @@ class ShareFinder:
|
||||
eventually(self.share_consumer.no_more_shares)
|
||||
|
||||
def send_request(self, server):
|
||||
peerid, rref = server
|
||||
req = RequestToken(peerid)
|
||||
req = RequestToken(server)
|
||||
self.pending_requests.add(req)
|
||||
lp = self.log(format="sending DYHB to [%(peerid)s]",
|
||||
peerid=idlib.shortnodeid_b2a(peerid),
|
||||
lp = self.log(format="sending DYHB to [%(name)s]", name=server.name(),
|
||||
level=log.NOISY, umid="Io7pyg")
|
||||
time_sent = now()
|
||||
d_ev = self._download_status.add_dyhb_sent(peerid, time_sent)
|
||||
d_ev = self._download_status.add_dyhb_sent(server.get_serverid(),
|
||||
time_sent)
|
||||
# TODO: get the timer from a Server object, it knows best
|
||||
self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT,
|
||||
self.overdue, req)
|
||||
d = rref.callRemote("get_buckets", self._storage_index)
|
||||
d = server.get_rref().callRemote("get_buckets", self._storage_index)
|
||||
d.addBoth(incidentally, self._request_retired, req)
|
||||
d.addCallbacks(self._got_response, self._got_error,
|
||||
callbackArgs=(rref.version, peerid, req, d_ev,
|
||||
time_sent, lp),
|
||||
errbackArgs=(peerid, req, d_ev, lp))
|
||||
callbackArgs=(server, req, d_ev, time_sent, lp),
|
||||
errbackArgs=(server, req, d_ev, lp))
|
||||
d.addErrback(log.err, format="error in send_request",
|
||||
level=log.WEIRD, parent=lp, umid="rpdV0w")
|
||||
d.addCallback(incidentally, eventually, self.loop)
|
||||
@ -165,29 +162,26 @@ class ShareFinder:
|
||||
self.overdue_requests.add(req)
|
||||
eventually(self.loop)
|
||||
|
||||
def _got_response(self, buckets, server_version, peerid, req, d_ev,
|
||||
time_sent, lp):
|
||||
def _got_response(self, buckets, server, req, d_ev, time_sent, lp):
|
||||
shnums = sorted([shnum for shnum in buckets])
|
||||
time_received = now()
|
||||
d_ev.finished(shnums, time_received)
|
||||
dyhb_rtt = time_received - time_sent
|
||||
if not buckets:
|
||||
self.log(format="no shares from [%(peerid)s]",
|
||||
peerid=idlib.shortnodeid_b2a(peerid),
|
||||
self.log(format="no shares from [%(name)s]", name=server.name(),
|
||||
level=log.NOISY, parent=lp, umid="U7d4JA")
|
||||
return
|
||||
shnums_s = ",".join([str(shnum) for shnum in shnums])
|
||||
self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
|
||||
shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
|
||||
self.log(format="got shnums [%(shnums)s] from [%(name)s]",
|
||||
shnums=shnums_s, name=server.name(),
|
||||
level=log.NOISY, parent=lp, umid="0fcEZw")
|
||||
shares = []
|
||||
for shnum, bucket in buckets.iteritems():
|
||||
s = self._create_share(shnum, bucket, server_version, peerid,
|
||||
dyhb_rtt)
|
||||
s = self._create_share(shnum, bucket, server, dyhb_rtt)
|
||||
shares.append(s)
|
||||
self._deliver_shares(shares)
|
||||
|
||||
def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt):
|
||||
def _create_share(self, shnum, bucket, server, dyhb_rtt):
|
||||
if shnum in self._commonshares:
|
||||
cs = self._commonshares[shnum]
|
||||
else:
|
||||
@ -210,8 +204,8 @@ class ShareFinder:
|
||||
# 2: break _get_satisfaction into Deferred-attached pieces.
|
||||
# Yuck.
|
||||
self._commonshares[shnum] = cs
|
||||
s = Share(bucket, server_version, self.verifycap, cs, self.node,
|
||||
self._download_status, peerid, shnum, dyhb_rtt,
|
||||
s = Share(bucket, server.get_version(), self.verifycap, cs, self.node,
|
||||
self._download_status, server.get_serverid(), shnum, dyhb_rtt,
|
||||
self._node_logparent)
|
||||
return s
|
||||
|
||||
@ -223,10 +217,10 @@ class ShareFinder:
|
||||
level=log.NOISY, umid="2n1qQw")
|
||||
eventually(self.share_consumer.got_shares, shares)
|
||||
|
||||
def _got_error(self, f, peerid, req, d_ev, lp):
|
||||
def _got_error(self, f, server, req, d_ev, lp):
|
||||
d_ev.finished("error", now())
|
||||
self.log(format="got error from [%(peerid)s]",
|
||||
peerid=idlib.shortnodeid_b2a(peerid), failure=f,
|
||||
self.log(format="got error from [%(name)s]",
|
||||
name=server.name(), failure=f,
|
||||
level=log.UNUSUAL, parent=lp, umid="zUKdCw")
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user