introducer: only record one announcement per (tubid,service) tuple. Fixes #343.

This commit is contained in:
Brian Warner 2008-04-23 15:05:39 -07:00
parent 55dfb697a4
commit 186492e620
3 changed files with 48 additions and 18 deletions

View File

@ -41,6 +41,13 @@ class IntroducerNode(node.Node):
ws = IntroducerWebishServer(webport, nodeurl_path) ws = IntroducerWebishServer(webport, nodeurl_path)
self.add_service(ws) self.add_service(ws)
def make_index(announcement):
(furl, service_name, ri_name, nickname, ver, oldest) = announcement
m = re.match(r'pb://(\w+)@', furl)
assert m
nodeid = b32decode(m.group(1).upper())
return (nodeid, service_name)
class IntroducerService(service.MultiService, Referenceable): class IntroducerService(service.MultiService, Referenceable):
implements(RIIntroducerPublisherAndSubscriberService) implements(RIIntroducerPublisherAndSubscriberService)
name = "introducer" name = "introducer"
@ -48,7 +55,8 @@ class IntroducerService(service.MultiService, Referenceable):
def __init__(self, basedir="."): def __init__(self, basedir="."):
service.MultiService.__init__(self) service.MultiService.__init__(self)
self.introducer_url = None self.introducer_url = None
self._announcements = {} # dict of (announcement)->timestamp # 'index' is (tubid, service_name)
self._announcements = {} # dict of index -> (announcement, timestamp)
self._subscribers = {} # dict of (rref->timestamp) dicts self._subscribers = {} # dict of (rref->timestamp) dicts
def log(self, *args, **kwargs): def log(self, *args, **kwargs):
@ -63,11 +71,16 @@ class IntroducerService(service.MultiService, Referenceable):
def remote_publish(self, announcement): def remote_publish(self, announcement):
self.log("introducer: announcement published: %s" % (announcement,) ) self.log("introducer: announcement published: %s" % (announcement,) )
index = make_index(announcement)
if index in self._announcements:
(old_announcement, timestamp) = self._announcements[index]
if old_announcement == announcement:
self.log("but we already knew it, ignoring", level=log.NOISY)
return
else:
self.log("old announcement being updated", level=log.NOISY)
self._announcements[index] = (announcement, time.time())
(furl, service_name, ri_name, nickname, ver, oldest) = announcement (furl, service_name, ri_name, nickname, ver, oldest) = announcement
if announcement in self._announcements:
self.log("but we already knew it, ignoring", level=log.NOISY)
return
self._announcements[announcement] = time.time()
for s in self._subscribers.get(service_name, []): for s in self._subscribers.get(service_name, []):
s.callRemote("announce", set([announcement])) s.callRemote("announce", set([announcement]))
@ -88,9 +101,9 @@ class IntroducerService(service.MultiService, Referenceable):
subscribers.pop(subscriber, None) subscribers.pop(subscriber, None)
subscriber.notifyOnDisconnect(_remove) subscriber.notifyOnDisconnect(_remove)
announcements = set( [ a announcements = set( [ ann
for a in self._announcements for idx,(ann,when) in self._announcements.items()
if a[1] == service_name ] ) if idx[1] == service_name] )
d = subscriber.callRemote("announce", announcements) d = subscriber.callRemote("announce", announcements)
d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL) d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
@ -126,7 +139,6 @@ class RemoteServiceConnector:
self._nodeid = b32decode(m.group(1).upper()) self._nodeid = b32decode(m.group(1).upper())
self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid) self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
self._index = (self._nodeid, service_name)
self.service_name = service_name self.service_name = service_name
self.log("attempting to connect to %s" % self._nodeid_s) self.log("attempting to connect to %s" % self._nodeid_s)
@ -142,9 +154,6 @@ class RemoteServiceConnector:
def log(self, *args, **kwargs): def log(self, *args, **kwargs):
return self._ic.log(*args, **kwargs) return self._ic.log(*args, **kwargs)
def get_index(self):
return self._index
def startConnecting(self): def startConnecting(self):
self._reconnector = self._tub.connectTo(self._furl, self._got_service) self._reconnector = self._tub.connectTo(self._furl, self._got_service)
@ -297,10 +306,11 @@ class IntroducerClient(service.Service, Referenceable):
def _new_announcement(self, announcement): def _new_announcement(self, announcement):
# this will only be called for new announcements # this will only be called for new announcements
rsc = RemoteServiceConnector(announcement, self._tub, self) index = make_index(announcement)
index = rsc.get_index()
if index in self._connectors: if index in self._connectors:
self.log("replacing earlier announcement", level=log.NOISY)
self._connectors[index].stopConnecting() self._connectors[index].stopConnecting()
rsc = RemoteServiceConnector(announcement, self._tub, self)
self._connectors[index] = rsc self._connectors[index] = rsc
rsc.startConnecting() rsc.startConnecting()

View File

@ -51,6 +51,26 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin):
i = IntroducerService() i = IntroducerService()
i.setServiceParent(self.parent) i.setServiceParent(self.parent)
def test_duplicate(self):
i = IntroducerService()
self.failUnlessEqual(len(i.get_announcements()), 0)
self.failUnlessEqual(len(i.get_subscribers()), 0)
furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
i.remote_publish(ann1)
self.failUnlessEqual(len(i.get_announcements()), 1)
self.failUnlessEqual(len(i.get_subscribers()), 0)
i.remote_publish(ann2)
self.failUnlessEqual(len(i.get_announcements()), 2)
self.failUnlessEqual(len(i.get_subscribers()), 0)
i.remote_publish(ann1b)
self.failUnlessEqual(len(i.get_announcements()), 2)
self.failUnlessEqual(len(i.get_subscribers()), 0)
def test_system(self): def test_system(self):
self.central_tub = tub = Tub() self.central_tub = tub = Tub()

View File

@ -29,7 +29,7 @@ class IntroducerRoot(rend.Page):
res["subscription_summary"] = subscription_summary res["subscription_summary"] = subscription_summary
announcement_summary = {} announcement_summary = {}
for ann in i.get_announcements(): for (ann,when) in i.get_announcements().values():
(furl, service_name, ri_name, nickname, ver, oldest) = ann (furl, service_name, ri_name, nickname, ver, oldest) = ann
if service_name not in announcement_summary: if service_name not in announcement_summary:
announcement_summary[service_name] = 0 announcement_summary[service_name] = 0
@ -48,7 +48,7 @@ class IntroducerRoot(rend.Page):
def render_announcement_summary(self, ctx, data): def render_announcement_summary(self, ctx, data):
i = IClient(ctx).getServiceNamed("introducer") i = IClient(ctx).getServiceNamed("introducer")
services = {} services = {}
for ann in i.get_announcements(): for (ann,when) in i.get_announcements().values():
(furl, service_name, ri_name, nickname, ver, oldest) = ann (furl, service_name, ri_name, nickname, ver, oldest) = ann
if service_name not in services: if service_name not in services:
services[service_name] = 0 services[service_name] = 0
@ -69,7 +69,7 @@ class IntroducerRoot(rend.Page):
def data_services(self, ctx, data): def data_services(self, ctx, data):
i = IClient(ctx).getServiceNamed("introducer") i = IClient(ctx).getServiceNamed("introducer")
ann = [(since,a) ann = [(since,a)
for (a,since) in i.get_announcements().items() for (a,since) in i.get_announcements().values()
if a[1] != "stub_client"] if a[1] != "stub_client"]
ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) ) ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) )
return ann return ann
@ -94,7 +94,7 @@ class IntroducerRoot(rend.Page):
i = IClient(ctx).getServiceNamed("introducer") i = IClient(ctx).getServiceNamed("introducer")
# use the "stub_client" announcements to get information per nodeid # use the "stub_client" announcements to get information per nodeid
clients = {} clients = {}
for ann in i.get_announcements(): for (ann,when) in i.get_announcements().values():
if ann[1] != "stub_client": if ann[1] != "stub_client":
continue continue
(furl, service_name, ri_name, nickname, ver, oldest) = ann (furl, service_name, ri_name, nickname, ver, oldest) = ann