mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-30 09:48:56 +00:00
Use ObserverList instead of an ad hoc reimplementation
This commit is contained in:
parent
60e401ca69
commit
b2c9296f6b
@ -24,6 +24,9 @@ from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
|
||||
get_tubid_string_from_ann
|
||||
from allmydata.util import log, yamlutil, connection_status
|
||||
from allmydata.util.rrefutil import add_version_to_remote_reference
|
||||
from allmydata.util.observer import (
|
||||
ObserverList,
|
||||
)
|
||||
from allmydata.crypto.error import BadSignature
|
||||
from allmydata.util.assertutil import precondition
|
||||
|
||||
@ -64,8 +67,7 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
self._publisher = None
|
||||
self._since = None
|
||||
|
||||
self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
|
||||
self._subscribed_service_names = set()
|
||||
self._local_subscribers = {} # {servicename: ObserverList}
|
||||
self._subscriptions = set() # requests we've actually sent
|
||||
|
||||
# _inbound_announcements remembers one announcement per
|
||||
@ -179,21 +181,21 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
return log.msg(*args, **kwargs)
|
||||
|
||||
def subscribe_to(self, service_name, cb, *args, **kwargs):
|
||||
self._local_subscribers.append( (service_name,cb,args,kwargs) )
|
||||
self._subscribed_service_names.add(service_name)
|
||||
obs = self._local_subscribers.setdefault(service_name, ObserverList())
|
||||
obs.subscribe(lambda key_s, ann: cb(key_s, ann, *args, **kwargs))
|
||||
self._maybe_subscribe()
|
||||
for index,(ann,key_s,when) in list(self._inbound_announcements.items()):
|
||||
precondition(isinstance(key_s, bytes), key_s)
|
||||
servicename = index[0]
|
||||
if servicename == service_name:
|
||||
eventually(cb, key_s, ann, *args, **kwargs)
|
||||
obs.notify(key_s, ann)
|
||||
|
||||
def _maybe_subscribe(self):
|
||||
if not self._publisher:
|
||||
self.log("want to subscribe, but no introducer yet",
|
||||
level=log.NOISY)
|
||||
return
|
||||
for service_name in self._subscribed_service_names:
|
||||
for service_name in self._local_subscribers:
|
||||
if service_name in self._subscriptions:
|
||||
continue
|
||||
self._subscriptions.add(service_name)
|
||||
@ -272,7 +274,7 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
precondition(isinstance(key_s, bytes), key_s)
|
||||
self._debug_counts["inbound_announcement"] += 1
|
||||
service_name = str(ann["service-name"])
|
||||
if service_name not in self._subscribed_service_names:
|
||||
if service_name not in self._local_subscribers:
|
||||
self.log("announcement for a service we don't care about [%s]"
|
||||
% (service_name,), level=log.UNUSUAL, umid="dIpGNA")
|
||||
self._debug_counts["wrong_service"] += 1
|
||||
@ -343,9 +345,10 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
def _deliver_announcements(self, key_s, ann):
|
||||
precondition(isinstance(key_s, bytes), key_s)
|
||||
service_name = str(ann["service-name"])
|
||||
for (service_name2,cb,args,kwargs) in self._local_subscribers:
|
||||
if service_name2 == service_name:
|
||||
eventually(cb, key_s, ann, *args, **kwargs)
|
||||
|
||||
obs = self._local_subscribers.get(service_name)
|
||||
if obs is not None:
|
||||
obs.notify(key_s, ann)
|
||||
|
||||
def connection_status(self):
|
||||
assert self.running # startService builds _introducer_reconnector
|
||||
|
Loading…
Reference in New Issue
Block a user