mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-14 00:39:57 +00:00
f9e261d939
When we establish any new connection, reset the delays on all the other Reconnectors. This will trigger a new batch of connection attempts. The idea is to detect when we (the client) have been offline for a while, and to connect to all servers when we get back online. By accelerating the timers inside the Reconnectors, we try to avoid spending a long time in a partially-connected state (which increases the chances of causing problems with mutable files, by not updating all the shares that we ought to).
369 lines
14 KiB
Python
369 lines
14 KiB
Python
|
|
import re, time, sha, os.path
|
|
from base64 import b32decode
|
|
from zope.interface import implements
|
|
from twisted.application import service
|
|
from foolscap import Referenceable
|
|
from allmydata import node
|
|
from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService, \
|
|
RIIntroducerSubscriberClient, IIntroducerClient
|
|
from allmydata.util import log, idlib
|
|
|
|
class IntroducerNode(node.Node):
|
|
PORTNUMFILE = "introducer.port"
|
|
NODETYPE = "introducer"
|
|
|
|
def __init__(self, basedir="."):
|
|
node.Node.__init__(self, basedir)
|
|
self.init_introducer()
|
|
webport = self.get_config("webport")
|
|
if webport:
|
|
self.init_web(webport) # strports string
|
|
|
|
def init_introducer(self):
|
|
introducerservice = IntroducerService(self.basedir)
|
|
self.add_service(introducerservice)
|
|
|
|
d = self.when_tub_ready()
|
|
def _publish(res):
|
|
self.introducer_url = self.tub.registerReference(introducerservice,
|
|
"introducer")
|
|
self.log(" introducer is at %s" % self.introducer_url)
|
|
self.write_config("introducer.furl", self.introducer_url + "\n")
|
|
d.addCallback(_publish)
|
|
d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
|
|
|
|
def init_web(self, webport):
|
|
self.log("init_web(webport=%s)", args=(webport,))
|
|
|
|
from allmydata.webish import IntroducerWebishServer
|
|
nodeurl_path = os.path.join(self.basedir, "node.url")
|
|
ws = IntroducerWebishServer(webport, nodeurl_path)
|
|
self.add_service(ws)
|
|
|
|
class IntroducerService(service.MultiService, Referenceable):
|
|
implements(RIIntroducerPublisherAndSubscriberService)
|
|
name = "introducer"
|
|
|
|
def __init__(self, basedir="."):
|
|
service.MultiService.__init__(self)
|
|
self.introducer_url = None
|
|
self._announcements = {} # dict of (announcement)->timestamp
|
|
self._subscribers = {} # dict of (rref->timestamp) dicts
|
|
|
|
def log(self, *args, **kwargs):
|
|
if "facility" not in kwargs:
|
|
kwargs["facility"] = "tahoe.introducer"
|
|
return log.msg(*args, **kwargs)
|
|
|
|
def get_announcements(self):
|
|
return self._announcements
|
|
def get_subscribers(self):
|
|
return self._subscribers
|
|
|
|
def remote_publish(self, announcement):
|
|
self.log("introducer: announcement published: %s" % (announcement,) )
|
|
(furl, service_name, ri_name, nickname, ver, oldest) = announcement
|
|
if announcement in self._announcements:
|
|
self.log("but we already knew it, ignoring", level=log.NOISY)
|
|
return
|
|
self._announcements[announcement] = time.time()
|
|
for s in self._subscribers.get(service_name, []):
|
|
s.callRemote("announce", set([announcement]))
|
|
|
|
def remote_subscribe(self, subscriber, service_name):
|
|
self.log("introducer: subscription[%s] request at %s" % (service_name,
|
|
subscriber))
|
|
if service_name not in self._subscribers:
|
|
self._subscribers[service_name] = {}
|
|
subscribers = self._subscribers[service_name]
|
|
if subscriber in subscribers:
|
|
self.log("but they're already subscribed, ignoring",
|
|
level=log.UNUSUAL)
|
|
return
|
|
subscribers[subscriber] = time.time()
|
|
def _remove():
|
|
self.log("introducer: unsubscribing[%s] %s" % (service_name,
|
|
subscriber))
|
|
subscribers.pop(subscriber, None)
|
|
subscriber.notifyOnDisconnect(_remove)
|
|
|
|
announcements = set( [ a
|
|
for a in self._announcements
|
|
if a[1] == service_name ] )
|
|
d = subscriber.callRemote("announce", announcements)
|
|
d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
|
|
|
|
|
|
|
|
class RemoteServiceConnector:
|
|
"""I hold information about a peer service that we want to connect to. If
|
|
we are connected, I hold the RemoteReference, the peer's address, and the
|
|
peer's version information. I remember information about when we were
|
|
last connected to the peer too, even if we aren't currently connected.
|
|
|
|
@ivar announcement_time: when we first heard about this service
|
|
@ivar last_connect_time: when we last established a connection
|
|
@ivar last_loss_time: when we last lost a connection
|
|
|
|
@ivar version: the peer's version, from the most recent announcement
|
|
@ivar oldest_supported: the peer's oldest supported version, same
|
|
@ivar nickname: the peer's self-reported nickname, same
|
|
|
|
@ivar rref: the RemoteReference, if connected, otherwise None
|
|
@ivar remote_host: the IAddress, if connected, otherwise None
|
|
"""
|
|
|
|
def __init__(self, announcement, tub, ic):
|
|
self._tub = tub
|
|
self._announcement = announcement
|
|
self._ic = ic
|
|
(furl, service_name, ri_name, nickname, ver, oldest) = announcement
|
|
|
|
self._furl = furl
|
|
m = re.match(r'pb://(\w+)@', furl)
|
|
assert m
|
|
self._nodeid = b32decode(m.group(1).upper())
|
|
self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
|
|
|
|
self._index = (self._nodeid, service_name)
|
|
self.service_name = service_name
|
|
|
|
self.log("attempting to connect to %s" % self._nodeid_s)
|
|
self.announcement_time = time.time()
|
|
self.last_loss_time = None
|
|
self.rref = None
|
|
self.remote_host = None
|
|
self.last_connect_time = None
|
|
self.version = ver
|
|
self.oldest_supported = oldest
|
|
self.nickname = nickname
|
|
|
|
def log(self, *args, **kwargs):
|
|
return self._ic.log(*args, **kwargs)
|
|
|
|
def get_index(self):
|
|
return self._index
|
|
|
|
def startConnecting(self):
|
|
self._reconnector = self._tub.connectTo(self._furl, self._got_service)
|
|
|
|
def stopConnecting(self):
|
|
self._reconnector.stopConnecting()
|
|
|
|
def _got_service(self, rref):
|
|
self.last_connect_time = time.time()
|
|
self.remote_host = rref.tracker.broker.transport.getPeer()
|
|
|
|
self.rref = rref
|
|
self.log("connected to %s" % self._nodeid_s)
|
|
|
|
self._ic.add_connection(self._nodeid, self.service_name, rref)
|
|
|
|
rref.notifyOnDisconnect(self._lost, rref)
|
|
|
|
def _lost(self, rref):
|
|
self.log("lost connection to %s" % self._nodeid_s)
|
|
self.last_loss_time = time.time()
|
|
self.rref = None
|
|
self.remote_host = None
|
|
self._ic.remove_connection(self._nodeid, self.service_name, rref)
|
|
|
|
def reset(self):
|
|
self._reconnector.reset()
|
|
|
|
|
|
class IntroducerClient(service.Service, Referenceable):
|
|
implements(RIIntroducerSubscriberClient, IIntroducerClient)
|
|
|
|
def __init__(self, tub, introducer_furl,
|
|
nickname, my_version, oldest_supported):
|
|
self._tub = tub
|
|
self.introducer_furl = introducer_furl
|
|
|
|
self._nickname = nickname
|
|
self._my_version = my_version
|
|
self._oldest_supported = oldest_supported
|
|
|
|
self._published_announcements = set()
|
|
|
|
self._publisher = None
|
|
self._connected = False
|
|
|
|
self._subscribed_service_names = set()
|
|
self._subscriptions = set() # requests we've actually sent
|
|
self._received_announcements = set()
|
|
# TODO: this set will grow without bound, until the node is restarted
|
|
|
|
# we only accept one announcement per (peerid+service_name) pair.
|
|
# This insures that an upgraded host replace their previous
|
|
# announcement. It also means that each peer must have their own Tub
|
|
# (no sharing), which is slightly weird but consistent with the rest
|
|
# of the Tahoe codebase.
|
|
self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
|
|
# self._connections is a set of (peerid, service_name, rref) tuples
|
|
self._connections = set()
|
|
|
|
self.counter = 0 # incremented each time we change state, for tests
|
|
self.encoding_parameters = None
|
|
|
|
def startService(self):
|
|
service.Service.startService(self)
|
|
rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
|
|
self._introducer_reconnector = rc
|
|
def connect_failed(failure):
|
|
self.log("Initial Introducer connection failed: perhaps it's down",
|
|
level=log.WEIRD, failure=failure)
|
|
d = self._tub.getReference(self.introducer_furl)
|
|
d.addErrback(connect_failed)
|
|
|
|
def _got_introducer(self, publisher):
|
|
self.log("connected to introducer")
|
|
self._connected = True
|
|
self._publisher = publisher
|
|
publisher.notifyOnDisconnect(self._disconnected)
|
|
self._maybe_publish()
|
|
self._maybe_subscribe()
|
|
|
|
def _disconnected(self):
|
|
self.log("bummer, we've lost our connection to the introducer")
|
|
self._connected = False
|
|
self._publisher = None
|
|
self._subscriptions.clear()
|
|
|
|
def stopService(self):
|
|
service.Service.stopService(self)
|
|
self._introducer_reconnector.stopConnecting()
|
|
for rsc in self._connectors.itervalues():
|
|
rsc.stopConnecting()
|
|
|
|
def log(self, *args, **kwargs):
|
|
if "facility" not in kwargs:
|
|
kwargs["facility"] = "tahoe.introducer"
|
|
return log.msg(*args, **kwargs)
|
|
|
|
|
|
def publish(self, furl, service_name, remoteinterface_name):
|
|
ann = (furl, service_name, remoteinterface_name,
|
|
self._nickname, self._my_version, self._oldest_supported)
|
|
self._published_announcements.add(ann)
|
|
self._maybe_publish()
|
|
|
|
def subscribe_to(self, service_name):
|
|
self._subscribed_service_names.add(service_name)
|
|
self._maybe_subscribe()
|
|
|
|
def _maybe_subscribe(self):
|
|
if not self._publisher:
|
|
self.log("want to subscribe, but no introducer yet",
|
|
level=log.NOISY)
|
|
return
|
|
for service_name in self._subscribed_service_names:
|
|
if service_name not in self._subscriptions:
|
|
# there is a race here, but the subscription desk ignores
|
|
# duplicate requests.
|
|
self._subscriptions.add(service_name)
|
|
d = self._publisher.callRemote("subscribe", self, service_name)
|
|
d.addErrback(log.err, facility="tahoe.introducer",
|
|
level=log.WEIRD)
|
|
|
|
def _maybe_publish(self):
|
|
if not self._publisher:
|
|
self.log("want to publish, but no introducer yet", level=log.NOISY)
|
|
return
|
|
# this re-publishes everything. The Introducer ignores duplicates
|
|
for ann in self._published_announcements:
|
|
d = self._publisher.callRemote("publish", ann)
|
|
d.addErrback(log.err, facility="tahoe.introducer",
|
|
level=log.WEIRD)
|
|
|
|
|
|
|
|
def remote_announce(self, announcements):
|
|
for ann in announcements:
|
|
self.log("received %d announcements" % len(announcements))
|
|
(furl, service_name, ri_name, nickname, ver, oldest) = ann
|
|
if service_name not in self._subscribed_service_names:
|
|
self.log("announcement for a service we don't care about [%s]"
|
|
% (service_name,), level=log.WEIRD)
|
|
continue
|
|
if ann in self._received_announcements:
|
|
self.log("ignoring old announcement: %s" % (ann,),
|
|
level=log.NOISY)
|
|
continue
|
|
self.log("new announcement[%s]: %s" % (service_name, ann))
|
|
self._received_announcements.add(ann)
|
|
self._new_announcement(ann)
|
|
|
|
def _new_announcement(self, announcement):
|
|
# this will only be called for new announcements
|
|
rsc = RemoteServiceConnector(announcement, self._tub, self)
|
|
index = rsc.get_index()
|
|
if index in self._connectors:
|
|
self._connectors[index].stopConnecting()
|
|
self._connectors[index] = rsc
|
|
rsc.startConnecting()
|
|
|
|
def add_connection(self, nodeid, service_name, rref):
|
|
self._connections.add( (nodeid, service_name, rref) )
|
|
self.counter += 1
|
|
# when one connection is established, reset the timers on all others,
|
|
# to trigger a reconnection attempt in one second. This is intended
|
|
# to accelerate server connections when we've been offline for a
|
|
# while. The goal is to avoid hanging out for a long time with
|
|
# connections to only a subset of the servers, which would increase
|
|
# the chances that we'll put shares in weird places (and not update
|
|
# existing shares of mutable files). See #374 for more details.
|
|
for rsc in self._connectors.values():
|
|
rsc.reset()
|
|
|
|
def remove_connection(self, nodeid, service_name, rref):
|
|
self._connections.discard( (nodeid, service_name, rref) )
|
|
self.counter += 1
|
|
|
|
|
|
def get_all_connections(self):
|
|
return frozenset(self._connections)
|
|
|
|
def get_all_connectors(self):
|
|
return self._connectors.copy()
|
|
|
|
def get_all_peerids(self):
|
|
return frozenset([peerid
|
|
for (peerid, service_name, rref)
|
|
in self._connections])
|
|
|
|
def get_all_connections_for(self, service_name):
|
|
return frozenset([c
|
|
for c in self._connections
|
|
if c[1] == service_name])
|
|
|
|
def get_permuted_peers(self, service_name, key):
|
|
"""Return an ordered list of (peerid, rref) tuples."""
|
|
|
|
results = []
|
|
for (c_peerid, c_service_name, rref) in self._connections:
|
|
assert isinstance(c_peerid, str)
|
|
if c_service_name != service_name:
|
|
continue
|
|
permuted = sha.new(key + c_peerid).digest()
|
|
results.append((permuted, c_peerid, rref))
|
|
|
|
results.sort(lambda a,b: cmp(a[0], b[0]))
|
|
return [ (r[1], r[2]) for r in results ]
|
|
|
|
|
|
|
|
def remote_set_encoding_parameters(self, parameters):
|
|
self.encoding_parameters = parameters
|
|
|
|
def connected_to_introducer(self):
|
|
return self._connected
|
|
|
|
def debug_disconnect_from_peerid(self, victim_nodeid):
|
|
# for unit tests: locate and sever all connections to the given
|
|
# peerid.
|
|
for (nodeid, service_name, rref) in self._connections:
|
|
if nodeid == victim_nodeid:
|
|
rref.tracker.broker.transport.loseConnection()
|