mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-13 16:29:51 +00:00
3e26c78ee3
This stores the sequence number in BASEDIR/announcement-seqnum, and increments it each time any service is published (every service announcement is regenerated with the new sequence number). As everyone knows, time is an illusion, and occasionally goes backwards, so a counter is generally safer (and reveals less information about the node). Later, we'll improve the introducer client to tolerate rollbacks (where, perhaps due to a VM being restarted from an earlier checkpoint, the stored sequence number reverts to an earlier version).
356 lines
16 KiB
Python
356 lines
16 KiB
Python
|
|
import time, os.path
|
|
from zope.interface import implements
|
|
from twisted.application import service
|
|
from foolscap.api import Referenceable
|
|
import allmydata
|
|
from allmydata import node
|
|
from allmydata.util import log, rrefutil
|
|
from allmydata.introducer.interfaces import \
|
|
RIIntroducerPublisherAndSubscriberService_v2
|
|
from allmydata.introducer.common import convert_announcement_v1_to_v2, \
|
|
convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
|
|
get_tubid_string_from_ann, SubscriberDescriptor, AnnouncementDescriptor
|
|
|
|
class IntroducerNode(node.Node):
|
|
PORTNUMFILE = "introducer.port"
|
|
NODETYPE = "introducer"
|
|
GENERATED_FILES = ['introducer.furl']
|
|
|
|
def __init__(self, basedir="."):
|
|
node.Node.__init__(self, basedir)
|
|
self.read_config()
|
|
self.init_introducer()
|
|
webport = self.get_config("node", "web.port", None)
|
|
if webport:
|
|
self.init_web(webport) # strports string
|
|
|
|
def init_introducer(self):
|
|
introducerservice = IntroducerService(self.basedir)
|
|
self.add_service(introducerservice)
|
|
|
|
d = self.when_tub_ready()
|
|
def _publish(res):
|
|
self.introducer_url = self.tub.registerReference(introducerservice,
|
|
"introducer")
|
|
self.log(" introducer is at %s" % self.introducer_url,
|
|
umid="qF2L9A")
|
|
self.write_config("introducer.furl", self.introducer_url + "\n")
|
|
d.addCallback(_publish)
|
|
d.addErrback(log.err, facility="tahoe.init",
|
|
level=log.BAD, umid="UaNs9A")
|
|
|
|
def init_web(self, webport):
|
|
self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
|
|
|
|
from allmydata.webish import IntroducerWebishServer
|
|
nodeurl_path = os.path.join(self.basedir, "node.url")
|
|
staticdir = self.get_config("node", "web.static", "public_html")
|
|
staticdir = os.path.expanduser(staticdir)
|
|
ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
|
|
self.add_service(ws)
|
|
|
|
class WrapV1SubscriberInV2Interface: # for_v1
|
|
"""I wrap a RemoteReference that points at an old v1 subscriber, enabling
|
|
it to be treated like a v2 subscriber.
|
|
"""
|
|
|
|
def __init__(self, original):
|
|
self.original = original # also used for tests
|
|
def __eq__(self, them):
|
|
return self.original == them
|
|
def __ne__(self, them):
|
|
return self.original != them
|
|
def __hash__(self):
|
|
return hash(self.original)
|
|
def getRemoteTubID(self):
|
|
return self.original.getRemoteTubID()
|
|
def getSturdyRef(self):
|
|
return self.original.getSturdyRef()
|
|
def getPeer(self):
|
|
return self.original.getPeer()
|
|
def getLocationHints(self):
|
|
return self.original.getLocationHints()
|
|
def callRemote(self, methname, *args, **kwargs):
|
|
m = getattr(self, "wrap_" + methname)
|
|
return m(*args, **kwargs)
|
|
def wrap_announce_v2(self, announcements):
|
|
anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
|
|
return self.original.callRemote("announce", set(anns_v1))
|
|
def wrap_set_encoding_parameters(self, parameters):
|
|
# note: unused
|
|
return self.original.callRemote("set_encoding_parameters", parameters)
|
|
def notifyOnDisconnect(self, *args, **kwargs):
|
|
return self.original.notifyOnDisconnect(*args, **kwargs)
|
|
|
|
class IntroducerService(service.MultiService, Referenceable):
|
|
implements(RIIntroducerPublisherAndSubscriberService_v2)
|
|
name = "introducer"
|
|
# v1 is the original protocol, supported since 1.0 (but only advertised
|
|
# starting in 1.3). v2 is the new signed protocol, supported after 1.9
|
|
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
|
|
"http://allmydata.org/tahoe/protocols/introducer/v2": { },
|
|
"application-version": str(allmydata.__full_version__),
|
|
}
|
|
|
|
def __init__(self, basedir="."):
|
|
service.MultiService.__init__(self)
|
|
self.introducer_url = None
|
|
# 'index' is (service_name, key_s, tubid), where key_s or tubid is
|
|
# None
|
|
self._announcements = {} # dict of index ->
|
|
# (ann_t, canary, ann, timestamp)
|
|
|
|
# ann (the announcement dictionary) is cleaned up: nickname is always
|
|
# unicode, servicename is always ascii, etc, even though
|
|
# simplejson.loads sometimes returns either
|
|
|
|
# self._subscribers is a dict mapping servicename to subscriptions
|
|
# 'subscriptions' is a dict mapping rref to a subscription
|
|
# 'subscription' is a tuple of (subscriber_info, timestamp)
|
|
# 'subscriber_info' is a dict, provided directly for v2 clients, or
|
|
# synthesized for v1 clients. The expected keys are:
|
|
# version, nickname, app-versions, my-version, oldest-supported
|
|
self._subscribers = {}
|
|
|
|
# self._stub_client_announcements contains the information provided
|
|
# by v1 clients. We stash this so we can match it up with their
|
|
# subscriptions.
|
|
self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
|
|
|
|
self._debug_counts = {"inbound_message": 0,
|
|
"inbound_duplicate": 0,
|
|
"inbound_no_seqnum": 0,
|
|
"inbound_old_replay": 0,
|
|
"inbound_update": 0,
|
|
"outbound_message": 0,
|
|
"outbound_announcements": 0,
|
|
"inbound_subscribe": 0}
|
|
self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
|
|
|
|
def _debug_retired(self, res):
|
|
self._debug_outstanding -= 1
|
|
return res
|
|
|
|
def log(self, *args, **kwargs):
|
|
if "facility" not in kwargs:
|
|
kwargs["facility"] = "tahoe.introducer.server"
|
|
return log.msg(*args, **kwargs)
|
|
|
|
def get_announcements(self, include_stub_clients=True):
|
|
"""Return a list of AnnouncementDescriptor for all announcements"""
|
|
announcements = []
|
|
for (index, (_, canary, ann, when)) in self._announcements.items():
|
|
if ann["service-name"] == "stub_client":
|
|
if not include_stub_clients:
|
|
continue
|
|
ad = AnnouncementDescriptor(when, index, canary, ann)
|
|
announcements.append(ad)
|
|
return announcements
|
|
|
|
def get_subscribers(self):
|
|
"""Return a list of SubscriberDescriptor objects for all subscribers"""
|
|
s = []
|
|
for service_name, subscriptions in self._subscribers.items():
|
|
for rref,(subscriber_info,when) in subscriptions.items():
|
|
# note that if the subscriber didn't do Tub.setLocation,
|
|
# tubid will be None. Also, subscribers do not tell us which
|
|
# pubkey they use; only publishers do that.
|
|
tubid = rref.getRemoteTubID() or "?"
|
|
advertised_addresses = rrefutil.hosts_for_rref(rref)
|
|
remote_address = rrefutil.stringify_remote_address(rref)
|
|
# these three assume subscriber_info["version"]==0, but
|
|
# should tolerate other versions
|
|
if not subscriber_info:
|
|
# V1 clients that haven't yet sent their stub_info data
|
|
subscriber_info = {}
|
|
nickname = subscriber_info.get("nickname", u"?")
|
|
version = subscriber_info.get("my-version", u"?")
|
|
app_versions = subscriber_info.get("app-versions", {})
|
|
# 'when' is the time they subscribed
|
|
sd = SubscriberDescriptor(service_name, when,
|
|
nickname, version, app_versions,
|
|
advertised_addresses, remote_address,
|
|
tubid)
|
|
s.append(sd)
|
|
return s
|
|
|
|
def remote_get_version(self):
|
|
return self.VERSION
|
|
|
|
def remote_publish(self, ann_t): # for_v1
|
|
lp = self.log("introducer: old (v1) announcement published: %s"
|
|
% (ann_t,), umid="6zGOIw")
|
|
ann_v2 = convert_announcement_v1_to_v2(ann_t)
|
|
return self.publish(ann_v2, None, lp)
|
|
|
|
def remote_publish_v2(self, ann_t, canary):
|
|
lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
|
|
return self.publish(ann_t, canary, lp)
|
|
|
|
def publish(self, ann_t, canary, lp):
|
|
try:
|
|
self._publish(ann_t, canary, lp)
|
|
except:
|
|
log.err(format="Introducer.remote_publish failed on %(ann)s",
|
|
ann=ann_t,
|
|
level=log.UNUSUAL, parent=lp, umid="620rWA")
|
|
raise
|
|
|
|
def _publish(self, ann_t, canary, lp):
|
|
self._debug_counts["inbound_message"] += 1
|
|
self.log("introducer: announcement published: %s" % (ann_t,),
|
|
umid="wKHgCw")
|
|
ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
|
|
index = make_index(ann, key)
|
|
|
|
service_name = str(ann["service-name"])
|
|
if service_name == "stub_client": # for_v1
|
|
self._attach_stub_client(ann, lp)
|
|
return
|
|
|
|
old = self._announcements.get(index)
|
|
if old:
|
|
(old_ann_t, canary, old_ann, timestamp) = old
|
|
if old_ann == ann:
|
|
self.log("but we already knew it, ignoring", level=log.NOISY,
|
|
umid="myxzLw")
|
|
self._debug_counts["inbound_duplicate"] += 1
|
|
return
|
|
else:
|
|
if "seqnum" in old_ann:
|
|
# must beat previous sequence number to replace
|
|
if ("seqnum" not in ann
|
|
or not isinstance(ann["seqnum"], (int,long))):
|
|
self.log("not replacing old ann, no valid seqnum",
|
|
level=log.NOISY, umid="ySbaVw")
|
|
self._debug_counts["inbound_no_seqnum"] += 1
|
|
return
|
|
if ann["seqnum"] <= old_ann["seqnum"]:
|
|
self.log("not replacing old ann, new seqnum is too old"
|
|
" (%s <= %s) (replay attack?)"
|
|
% (ann["seqnum"], old_ann["seqnum"]),
|
|
level=log.UNUSUAL, umid="sX7yqQ")
|
|
self._debug_counts["inbound_old_replay"] += 1
|
|
return
|
|
# ok, seqnum is newer, allow replacement
|
|
self.log("old announcement being updated", level=log.NOISY,
|
|
umid="304r9g")
|
|
self._debug_counts["inbound_update"] += 1
|
|
self._announcements[index] = (ann_t, canary, ann, time.time())
|
|
#if canary:
|
|
# canary.notifyOnDisconnect ...
|
|
# use a CanaryWatcher? with cw.is_connected()?
|
|
# actually we just want foolscap to give rref.is_connected(), since
|
|
# this is only for the status display
|
|
|
|
for s in self._subscribers.get(service_name, []):
|
|
self._debug_counts["outbound_message"] += 1
|
|
self._debug_counts["outbound_announcements"] += 1
|
|
self._debug_outstanding += 1
|
|
d = s.callRemote("announce_v2", set([ann_t]))
|
|
d.addBoth(self._debug_retired)
|
|
d.addErrback(log.err,
|
|
format="subscriber errored on announcement %(ann)s",
|
|
ann=ann_t, facility="tahoe.introducer",
|
|
level=log.UNUSUAL, umid="jfGMXQ")
|
|
|
|
def _attach_stub_client(self, ann, lp):
|
|
# There might be a v1 subscriber for whom this is a stub_client.
|
|
# We might have received the subscription before the stub_client
|
|
# announcement, in which case we now need to fix up the record in
|
|
# self._subscriptions .
|
|
|
|
# record it for later, in case the stub_client arrived before the
|
|
# subscription
|
|
subscriber_info = self._get_subscriber_info_from_ann(ann)
|
|
ann_tubid = get_tubid_string_from_ann(ann)
|
|
self._stub_client_announcements[ann_tubid] = subscriber_info
|
|
|
|
lp2 = self.log("stub_client announcement, "
|
|
"looking for matching subscriber",
|
|
parent=lp, level=log.NOISY, umid="BTywDg")
|
|
|
|
for sn in self._subscribers:
|
|
s = self._subscribers[sn]
|
|
for (subscriber, info) in s.items():
|
|
# we correlate these by looking for a subscriber whose tubid
|
|
# matches this announcement
|
|
sub_tubid = subscriber.getRemoteTubID()
|
|
if sub_tubid == ann_tubid:
|
|
self.log(format="found a match, nodeid=%(nodeid)s",
|
|
nodeid=sub_tubid,
|
|
level=log.NOISY, parent=lp2, umid="xsWs1A")
|
|
# found a match. Does it need info?
|
|
if not info[0]:
|
|
self.log(format="replacing info",
|
|
level=log.NOISY, parent=lp2, umid="m5kxwA")
|
|
# yup
|
|
s[subscriber] = (subscriber_info, info[1])
|
|
# and we don't remember or announce stub_clients beyond what we
|
|
# need to get the subscriber_info set up
|
|
|
|
def _get_subscriber_info_from_ann(self, ann): # for_v1
|
|
sinfo = { "version": ann["version"],
|
|
"nickname": ann["nickname"],
|
|
"app-versions": ann["app-versions"],
|
|
"my-version": ann["my-version"],
|
|
"oldest-supported": ann["oldest-supported"],
|
|
}
|
|
return sinfo
|
|
|
|
def remote_subscribe(self, subscriber, service_name): # for_v1
|
|
self.log("introducer: old (v1) subscription[%s] request at %s"
|
|
% (service_name, subscriber), umid="hJlGUg")
|
|
return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
|
|
service_name, None)
|
|
|
|
def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
|
|
self.log("introducer: subscription[%s] request at %s"
|
|
% (service_name, subscriber), umid="U3uzLg")
|
|
return self.add_subscriber(subscriber, service_name, subscriber_info)
|
|
|
|
def add_subscriber(self, subscriber, service_name, subscriber_info):
|
|
self._debug_counts["inbound_subscribe"] += 1
|
|
if service_name not in self._subscribers:
|
|
self._subscribers[service_name] = {}
|
|
subscribers = self._subscribers[service_name]
|
|
if subscriber in subscribers:
|
|
self.log("but they're already subscribed, ignoring",
|
|
level=log.UNUSUAL, umid="Sy9EfA")
|
|
return
|
|
|
|
if not subscriber_info: # for_v1
|
|
# v1 clients don't provide subscriber_info, but they should
|
|
# publish a 'stub client' record which contains the same
|
|
# information. If we've already received this, it will be in
|
|
# self._stub_client_announcements
|
|
tubid = subscriber.getRemoteTubID()
|
|
if tubid in self._stub_client_announcements:
|
|
subscriber_info = self._stub_client_announcements[tubid]
|
|
|
|
subscribers[subscriber] = (subscriber_info, time.time())
|
|
def _remove():
|
|
self.log("introducer: unsubscribing[%s] %s" % (service_name,
|
|
subscriber),
|
|
umid="vYGcJg")
|
|
subscribers.pop(subscriber, None)
|
|
subscriber.notifyOnDisconnect(_remove)
|
|
|
|
# now tell them about any announcements they're interested in
|
|
announcements = set( [ ann_t
|
|
for idx,(ann_t,canary,ann,when)
|
|
in self._announcements.items()
|
|
if idx[0] == service_name] )
|
|
if announcements:
|
|
self._debug_counts["outbound_message"] += 1
|
|
self._debug_counts["outbound_announcements"] += len(announcements)
|
|
self._debug_outstanding += 1
|
|
d = subscriber.callRemote("announce_v2", announcements)
|
|
d.addBoth(self._debug_retired)
|
|
d.addErrback(log.err,
|
|
format="subscriber errored during subscribe %(anns)s",
|
|
anns=announcements, facility="tahoe.introducer",
|
|
level=log.UNUSUAL, umid="mtZepQ")
|
|
return d
|