diff --git a/src/allmydata/introducer/client.py b/src/allmydata/introducer/client.py index fa1e1efe8..60a06ce2d 100644 --- a/src/allmydata/introducer/client.py +++ b/src/allmydata/introducer/client.py @@ -24,6 +24,9 @@ from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\ get_tubid_string_from_ann from allmydata.util import log, yamlutil, connection_status from allmydata.util.rrefutil import add_version_to_remote_reference +from allmydata.util.observer import ( + ObserverList, +) from allmydata.crypto.error import BadSignature from allmydata.util.assertutil import precondition @@ -64,8 +67,7 @@ class IntroducerClient(service.Service, Referenceable): self._publisher = None self._since = None - self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples - self._subscribed_service_names = set() + self._local_subscribers = {} # {servicename: ObserverList} self._subscriptions = set() # requests we've actually sent # _inbound_announcements remembers one announcement per @@ -179,21 +181,21 @@ class IntroducerClient(service.Service, Referenceable): return log.msg(*args, **kwargs) def subscribe_to(self, service_name, cb, *args, **kwargs): - self._local_subscribers.append( (service_name,cb,args,kwargs) ) - self._subscribed_service_names.add(service_name) + obs = self._local_subscribers.setdefault(service_name, ObserverList()) + obs.subscribe(lambda key_s, ann: cb(key_s, ann, *args, **kwargs)) self._maybe_subscribe() for index,(ann,key_s,when) in list(self._inbound_announcements.items()): precondition(isinstance(key_s, bytes), key_s) servicename = index[0] if servicename == service_name: - eventually(cb, key_s, ann, *args, **kwargs) + obs.notify(key_s, ann) def _maybe_subscribe(self): if not self._publisher: self.log("want to subscribe, but no introducer yet", level=log.NOISY) return - for service_name in self._subscribed_service_names: + for service_name in self._local_subscribers: if service_name in self._subscriptions: continue self._subscriptions.add(service_name) @@ -272,7 +274,7 @@ class IntroducerClient(service.Service, Referenceable): precondition(isinstance(key_s, bytes), key_s) self._debug_counts["inbound_announcement"] += 1 service_name = str(ann["service-name"]) - if service_name not in self._subscribed_service_names: + if service_name not in self._local_subscribers: self.log("announcement for a service we don't care about [%s]" % (service_name,), level=log.UNUSUAL, umid="dIpGNA") self._debug_counts["wrong_service"] += 1 @@ -343,9 +345,10 @@ class IntroducerClient(service.Service, Referenceable): def _deliver_announcements(self, key_s, ann): precondition(isinstance(key_s, bytes), key_s) service_name = str(ann["service-name"]) - for (service_name2,cb,args,kwargs) in self._local_subscribers: - if service_name2 == service_name: - eventually(cb, key_s, ann, *args, **kwargs) + + obs = self._local_subscribers.get(service_name) + if obs is not None: + obs.notify(key_s, ann) def connection_status(self): assert self.running # startService builds _introducer_reconnector