mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-03-24 04:35:17 +00:00
introducer: remove PeerCountObserver, tests are managing with purely poll-for-connected approachers
This commit is contained in:
parent
daecca6589
commit
80b72d919a
@ -3,12 +3,11 @@ import re, time, sha
|
||||
from base64 import b32decode
|
||||
from zope.interface import implements
|
||||
from twisted.application import service
|
||||
from twisted.internet import defer
|
||||
from foolscap import Referenceable
|
||||
from allmydata import node
|
||||
from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService, \
|
||||
RIIntroducerSubscriberClient, IIntroducerClient
|
||||
from allmydata.util import observer, log, idlib
|
||||
from allmydata.util import log, idlib
|
||||
|
||||
class IntroducerNode(node.Node):
|
||||
PORTNUMFILE = "introducer.port"
|
||||
@ -83,95 +82,6 @@ class IntroducerService(service.MultiService, Referenceable):
|
||||
self._encoding_parameters)
|
||||
|
||||
|
||||
class PeerCountObserver:
|
||||
# This is used by unit test code to wait until peer connections have been
|
||||
# established.
|
||||
|
||||
def __init__(self):
|
||||
# The N'th element of _observers_of_enough_peers is None if nobody has
|
||||
# asked to be informed when N peers become connected, it is a
|
||||
# OneShotObserverList if someone has asked to be informed, and that list
|
||||
# is fired when N peers next become connected (or immediately if N peers
|
||||
# are already connected when they asked), and the N'th element is
|
||||
# replaced by None when the number of connected peers falls below N.
|
||||
# _observers_of_enough_peers is always just long enough to hold the
|
||||
# highest-numbered N that anyone is interested in (i.e., there are never
|
||||
# trailing Nones in _observers_of_enough_peers).
|
||||
self._observers_of_enough_peers = []
|
||||
# The N'th element of _observers_of_fewer_than_peers is None if nobody
|
||||
# has asked to be informed when we become connected to fewer than N
|
||||
# peers, it is a OneShotObserverList if someone has asked to be
|
||||
# informed, and that list is fired when we become connected to fewer
|
||||
# than N peers (or immediately if we are already connected to fewer than
|
||||
# N peers when they asked). _observers_of_fewer_than_peers is always
|
||||
# just long enough to hold the highest-numbered N that anyone is
|
||||
# interested in (i.e., there are never trailing Nones in
|
||||
# _observers_of_fewer_than_peers).
|
||||
self._observers_of_fewer_than_peers = []
|
||||
self.connection_observers = observer.ObserverList()
|
||||
|
||||
def _notify_observers_of_enough_peers(self, numpeers):
|
||||
if len(self._observers_of_enough_peers) > numpeers:
|
||||
osol = self._observers_of_enough_peers[numpeers]
|
||||
if osol:
|
||||
osol.fire(None)
|
||||
|
||||
def _remove_observers_of_enough_peers(self, numpeers):
|
||||
if len(self._observers_of_enough_peers) > numpeers:
|
||||
self._observers_of_enough_peers[numpeers] = None
|
||||
while self._observers_of_enough_peers and (not self._observers_of_enough_peers[-1]):
|
||||
self._observers_of_enough_peers.pop()
|
||||
|
||||
def _notify_observers_of_fewer_than_peers(self, numpeers):
|
||||
if len(self._observers_of_fewer_than_peers) > numpeers:
|
||||
osol = self._observers_of_fewer_than_peers[numpeers]
|
||||
if osol:
|
||||
osol.fire(None)
|
||||
self._observers_of_fewer_than_peers[numpeers] = None
|
||||
while len(self._observers_of_fewer_than_peers) > numpeers and (not self._observers_of_fewer_than_peers[-1]):
|
||||
self._observers_of_fewer_than_peers.pop()
|
||||
|
||||
def when_enough_peers(self, numpeers):
|
||||
"""
|
||||
I return a deferred that fires the next time that at least
|
||||
numpeers are connected, or fires immediately if numpeers are
|
||||
currently connected.
|
||||
"""
|
||||
self._observers_of_enough_peers.extend([None]*(numpeers+1-len(self._observers_of_enough_peers)))
|
||||
if not self._observers_of_enough_peers[numpeers]:
|
||||
self._observers_of_enough_peers[numpeers] = observer.OneShotObserverList()
|
||||
if len(self.connections) >= numpeers:
|
||||
self._observers_of_enough_peers[numpeers].fire(self)
|
||||
return self._observers_of_enough_peers[numpeers].when_fired()
|
||||
|
||||
def when_fewer_than_peers(self, numpeers):
|
||||
"""
|
||||
I return a deferred that fires the next time that fewer than numpeers
|
||||
are connected, or fires immediately if fewer than numpeers are currently
|
||||
connected.
|
||||
"""
|
||||
if len(self.connections) < numpeers:
|
||||
return defer.succeed(None)
|
||||
else:
|
||||
self._observers_of_fewer_than_peers.extend([None]*(numpeers+1-len(self._observers_of_fewer_than_peers)))
|
||||
if not self._observers_of_fewer_than_peers[numpeers]:
|
||||
self._observers_of_fewer_than_peers[numpeers] = observer.OneShotObserverList()
|
||||
return self._observers_of_fewer_than_peers[numpeers].when_fired()
|
||||
|
||||
def notify_on_new_connection(self, cb):
|
||||
"""Register a callback that will be fired (with nodeid, rref) when
|
||||
a new connection is established."""
|
||||
self.connection_observers.subscribe(cb)
|
||||
|
||||
def add_peer(self, ann):
|
||||
self._notify_observers_of_enough_peers(len(self.connections))
|
||||
self._notify_observers_of_fewer_than_peers(len(self.connections))
|
||||
|
||||
def remove_peer(self, ann):
|
||||
self._remove_observers_of_enough_peers(len(self.connections))
|
||||
self._notify_observers_of_fewer_than_peers(len(self.connections)+1)
|
||||
|
||||
|
||||
|
||||
class RemoteServiceConnector:
|
||||
"""I hold information about a peer service that we want to connect to. If
|
||||
@ -277,7 +187,6 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
# self._connections is a set of (peerid, service_name, rref) tuples
|
||||
self._connections = set()
|
||||
|
||||
#self.counter = PeerCountObserver()
|
||||
self.counter = 0 # incremented each time we change state, for tests
|
||||
self.encoding_parameters = None
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user