mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-27 23:00:45 +00:00
96c625920c
* remove when_tub_ready() from all code * synchronous-ify all node/client/introducer startup code refs ticket:2491
364 lines
17 KiB
Python
364 lines
17 KiB
Python
|
|
import time, os.path, textwrap
|
|
from zope.interface import implements
|
|
from twisted.application import service
|
|
from foolscap.api import Referenceable
|
|
import allmydata
|
|
from allmydata import node
|
|
from allmydata.util import log, rrefutil
|
|
from allmydata.util.fileutil import abspath_expanduser_unicode
|
|
from allmydata.introducer.interfaces import \
|
|
RIIntroducerPublisherAndSubscriberService_v2
|
|
from allmydata.introducer.common import convert_announcement_v1_to_v2, \
|
|
convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
|
|
get_tubid_string_from_ann, SubscriberDescriptor, AnnouncementDescriptor
|
|
|
|
class FurlFileConflictError(Exception):
|
|
pass
|
|
|
|
class IntroducerNode(node.Node):
|
|
PORTNUMFILE = "introducer.port"
|
|
NODETYPE = "introducer"
|
|
GENERATED_FILES = ['introducer.furl']
|
|
|
|
def __init__(self, basedir=u"."):
|
|
node.Node.__init__(self, basedir)
|
|
self.read_config()
|
|
self.init_introducer()
|
|
webport = self.get_config("node", "web.port", None)
|
|
if webport:
|
|
self.init_web(webport) # strports string
|
|
|
|
def init_introducer(self):
|
|
introducerservice = IntroducerService(self.basedir)
|
|
self.add_service(introducerservice)
|
|
|
|
old_public_fn = os.path.join(self.basedir, u"introducer.furl")
|
|
private_fn = os.path.join(self.basedir, u"private", u"introducer.furl")
|
|
|
|
if os.path.exists(old_public_fn):
|
|
if os.path.exists(private_fn):
|
|
msg = """This directory (%s) contains both an old public
|
|
'introducer.furl' file, and a new-style
|
|
'private/introducer.furl', so I cannot safely remove the old
|
|
one. Please make sure your desired FURL is in
|
|
private/introducer.furl, and remove the public file. If this
|
|
causes your Introducer's FURL to change, you need to inform
|
|
all grid members so they can update their tahoe.cfg.
|
|
"""
|
|
raise FurlFileConflictError(textwrap.dedent(msg))
|
|
os.rename(old_public_fn, private_fn)
|
|
furl = self.tub.registerReference(introducerservice,
|
|
furlFile=private_fn)
|
|
self.log(" introducer is at %s" % furl, umid="qF2L9A")
|
|
self.introducer_url = furl # for tests
|
|
|
|
def init_web(self, webport):
|
|
self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
|
|
|
|
from allmydata.webish import IntroducerWebishServer
|
|
nodeurl_path = os.path.join(self.basedir, u"node.url")
|
|
config_staticdir = self.get_config("node", "web.static", "public_html").decode('utf-8')
|
|
staticdir = abspath_expanduser_unicode(config_staticdir, base=self.basedir)
|
|
ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
|
|
self.add_service(ws)
|
|
|
|
class WrapV1SubscriberInV2Interface: # for_v1
|
|
"""I wrap a RemoteReference that points at an old v1 subscriber, enabling
|
|
it to be treated like a v2 subscriber.
|
|
"""
|
|
|
|
def __init__(self, original):
|
|
self.original = original # also used for tests
|
|
def __eq__(self, them):
|
|
return self.original == them
|
|
def __ne__(self, them):
|
|
return self.original != them
|
|
def __hash__(self):
|
|
return hash(self.original)
|
|
def getRemoteTubID(self):
|
|
return self.original.getRemoteTubID()
|
|
def getSturdyRef(self):
|
|
return self.original.getSturdyRef()
|
|
def getPeer(self):
|
|
return self.original.getPeer()
|
|
def getLocationHints(self):
|
|
return self.original.getLocationHints()
|
|
def callRemote(self, methname, *args, **kwargs):
|
|
m = getattr(self, "wrap_" + methname)
|
|
return m(*args, **kwargs)
|
|
def wrap_announce_v2(self, announcements):
|
|
anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
|
|
return self.original.callRemote("announce", set(anns_v1))
|
|
def notifyOnDisconnect(self, *args, **kwargs):
|
|
return self.original.notifyOnDisconnect(*args, **kwargs)
|
|
|
|
class IntroducerService(service.MultiService, Referenceable):
|
|
implements(RIIntroducerPublisherAndSubscriberService_v2)
|
|
name = "introducer"
|
|
# v1 is the original protocol, supported since 1.0 (but only advertised
|
|
# starting in 1.3). v2 is the new signed protocol, supported after 1.9
|
|
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
|
|
"http://allmydata.org/tahoe/protocols/introducer/v2": { },
|
|
"application-version": str(allmydata.__full_version__),
|
|
}
|
|
|
|
def __init__(self, basedir="."):
|
|
service.MultiService.__init__(self)
|
|
self.introducer_url = None
|
|
# 'index' is (service_name, key_s, tubid), where key_s or tubid is
|
|
# None
|
|
self._announcements = {} # dict of index ->
|
|
# (ann_t, canary, ann, timestamp)
|
|
|
|
# ann (the announcement dictionary) is cleaned up: nickname is always
|
|
# unicode, servicename is always ascii, etc, even though
|
|
# simplejson.loads sometimes returns either
|
|
|
|
# self._subscribers is a dict mapping servicename to subscriptions
|
|
# 'subscriptions' is a dict mapping rref to a subscription
|
|
# 'subscription' is a tuple of (subscriber_info, timestamp)
|
|
# 'subscriber_info' is a dict, provided directly for v2 clients, or
|
|
# synthesized for v1 clients. The expected keys are:
|
|
# version, nickname, app-versions, my-version, oldest-supported
|
|
self._subscribers = {}
|
|
|
|
# self._stub_client_announcements contains the information provided
|
|
# by v1 clients. We stash this so we can match it up with their
|
|
# subscriptions.
|
|
self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
|
|
|
|
self._debug_counts = {"inbound_message": 0,
|
|
"inbound_duplicate": 0,
|
|
"inbound_no_seqnum": 0,
|
|
"inbound_old_replay": 0,
|
|
"inbound_update": 0,
|
|
"outbound_message": 0,
|
|
"outbound_announcements": 0,
|
|
"inbound_subscribe": 0}
|
|
self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
|
|
|
|
def _debug_retired(self, res):
|
|
self._debug_outstanding -= 1
|
|
return res
|
|
|
|
def log(self, *args, **kwargs):
|
|
if "facility" not in kwargs:
|
|
kwargs["facility"] = "tahoe.introducer.server"
|
|
return log.msg(*args, **kwargs)
|
|
|
|
def get_announcements(self, include_stub_clients=True):
|
|
"""Return a list of AnnouncementDescriptor for all announcements"""
|
|
announcements = []
|
|
for (index, (_, canary, ann, when)) in self._announcements.items():
|
|
if ann["service-name"] == "stub_client":
|
|
if not include_stub_clients:
|
|
continue
|
|
ad = AnnouncementDescriptor(when, index, canary, ann)
|
|
announcements.append(ad)
|
|
return announcements
|
|
|
|
def get_subscribers(self):
|
|
"""Return a list of SubscriberDescriptor objects for all subscribers"""
|
|
s = []
|
|
for service_name, subscriptions in self._subscribers.items():
|
|
for rref,(subscriber_info,when) in subscriptions.items():
|
|
# note that if the subscriber didn't do Tub.setLocation,
|
|
# tubid will be None. Also, subscribers do not tell us which
|
|
# pubkey they use; only publishers do that.
|
|
tubid = rref.getRemoteTubID() or "?"
|
|
remote_address = rrefutil.stringify_remote_address(rref)
|
|
# these three assume subscriber_info["version"]==0, but
|
|
# should tolerate other versions
|
|
if not subscriber_info:
|
|
# V1 clients that haven't yet sent their stub_info data
|
|
subscriber_info = {}
|
|
nickname = subscriber_info.get("nickname", u"?")
|
|
version = subscriber_info.get("my-version", u"?")
|
|
app_versions = subscriber_info.get("app-versions", {})
|
|
# 'when' is the time they subscribed
|
|
sd = SubscriberDescriptor(service_name, when,
|
|
nickname, version, app_versions,
|
|
remote_address, tubid)
|
|
s.append(sd)
|
|
return s
|
|
|
|
def remote_get_version(self):
|
|
return self.VERSION
|
|
|
|
def remote_publish(self, ann_t): # for_v1
|
|
lp = self.log("introducer: old (v1) announcement published: %s"
|
|
% (ann_t,), umid="6zGOIw")
|
|
ann_v2 = convert_announcement_v1_to_v2(ann_t)
|
|
return self.publish(ann_v2, None, lp)
|
|
|
|
def remote_publish_v2(self, ann_t, canary):
|
|
lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
|
|
return self.publish(ann_t, canary, lp)
|
|
|
|
def publish(self, ann_t, canary, lp):
|
|
try:
|
|
self._publish(ann_t, canary, lp)
|
|
except:
|
|
log.err(format="Introducer.remote_publish failed on %(ann)s",
|
|
ann=ann_t,
|
|
level=log.UNUSUAL, parent=lp, umid="620rWA")
|
|
raise
|
|
|
|
def _publish(self, ann_t, canary, lp):
|
|
self._debug_counts["inbound_message"] += 1
|
|
self.log("introducer: announcement published: %s" % (ann_t,),
|
|
umid="wKHgCw")
|
|
ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
|
|
index = make_index(ann, key)
|
|
|
|
service_name = str(ann["service-name"])
|
|
if service_name == "stub_client": # for_v1
|
|
self._attach_stub_client(ann, lp)
|
|
return
|
|
|
|
old = self._announcements.get(index)
|
|
if old:
|
|
(old_ann_t, canary, old_ann, timestamp) = old
|
|
if old_ann == ann:
|
|
self.log("but we already knew it, ignoring", level=log.NOISY,
|
|
umid="myxzLw")
|
|
self._debug_counts["inbound_duplicate"] += 1
|
|
return
|
|
else:
|
|
if "seqnum" in old_ann:
|
|
# must beat previous sequence number to replace
|
|
if ("seqnum" not in ann
|
|
or not isinstance(ann["seqnum"], (int,long))):
|
|
self.log("not replacing old ann, no valid seqnum",
|
|
level=log.NOISY, umid="ySbaVw")
|
|
self._debug_counts["inbound_no_seqnum"] += 1
|
|
return
|
|
if ann["seqnum"] <= old_ann["seqnum"]:
|
|
self.log("not replacing old ann, new seqnum is too old"
|
|
" (%s <= %s) (replay attack?)"
|
|
% (ann["seqnum"], old_ann["seqnum"]),
|
|
level=log.UNUSUAL, umid="sX7yqQ")
|
|
self._debug_counts["inbound_old_replay"] += 1
|
|
return
|
|
# ok, seqnum is newer, allow replacement
|
|
self.log("old announcement being updated", level=log.NOISY,
|
|
umid="304r9g")
|
|
self._debug_counts["inbound_update"] += 1
|
|
self._announcements[index] = (ann_t, canary, ann, time.time())
|
|
#if canary:
|
|
# canary.notifyOnDisconnect ...
|
|
# use a CanaryWatcher? with cw.is_connected()?
|
|
# actually we just want foolscap to give rref.is_connected(), since
|
|
# this is only for the status display
|
|
|
|
for s in self._subscribers.get(service_name, []):
|
|
self._debug_counts["outbound_message"] += 1
|
|
self._debug_counts["outbound_announcements"] += 1
|
|
self._debug_outstanding += 1
|
|
d = s.callRemote("announce_v2", set([ann_t]))
|
|
d.addBoth(self._debug_retired)
|
|
d.addErrback(log.err,
|
|
format="subscriber errored on announcement %(ann)s",
|
|
ann=ann_t, facility="tahoe.introducer",
|
|
level=log.UNUSUAL, umid="jfGMXQ")
|
|
|
|
def _attach_stub_client(self, ann, lp):
|
|
# There might be a v1 subscriber for whom this is a stub_client.
|
|
# We might have received the subscription before the stub_client
|
|
# announcement, in which case we now need to fix up the record in
|
|
# self._subscriptions .
|
|
|
|
# record it for later, in case the stub_client arrived before the
|
|
# subscription
|
|
subscriber_info = self._get_subscriber_info_from_ann(ann)
|
|
ann_tubid = get_tubid_string_from_ann(ann)
|
|
self._stub_client_announcements[ann_tubid] = subscriber_info
|
|
|
|
lp2 = self.log("stub_client announcement, "
|
|
"looking for matching subscriber",
|
|
parent=lp, level=log.NOISY, umid="BTywDg")
|
|
|
|
for sn in self._subscribers:
|
|
s = self._subscribers[sn]
|
|
for (subscriber, info) in s.items():
|
|
# we correlate these by looking for a subscriber whose tubid
|
|
# matches this announcement
|
|
sub_tubid = subscriber.getRemoteTubID()
|
|
if sub_tubid == ann_tubid:
|
|
self.log(format="found a match, nodeid=%(nodeid)s",
|
|
nodeid=sub_tubid,
|
|
level=log.NOISY, parent=lp2, umid="xsWs1A")
|
|
# found a match. Does it need info?
|
|
if not info[0]:
|
|
self.log(format="replacing info",
|
|
level=log.NOISY, parent=lp2, umid="m5kxwA")
|
|
# yup
|
|
s[subscriber] = (subscriber_info, info[1])
|
|
# and we don't remember or announce stub_clients beyond what we
|
|
# need to get the subscriber_info set up
|
|
|
|
def _get_subscriber_info_from_ann(self, ann): # for_v1
|
|
sinfo = { "version": ann["version"],
|
|
"nickname": ann["nickname"],
|
|
"app-versions": ann["app-versions"],
|
|
"my-version": ann["my-version"],
|
|
"oldest-supported": ann["oldest-supported"],
|
|
}
|
|
return sinfo
|
|
|
|
def remote_subscribe(self, subscriber, service_name): # for_v1
|
|
self.log("introducer: old (v1) subscription[%s] request at %s"
|
|
% (service_name, subscriber), umid="hJlGUg")
|
|
return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
|
|
service_name, None)
|
|
|
|
def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
|
|
self.log("introducer: subscription[%s] request at %s"
|
|
% (service_name, subscriber), umid="U3uzLg")
|
|
return self.add_subscriber(subscriber, service_name, subscriber_info)
|
|
|
|
def add_subscriber(self, subscriber, service_name, subscriber_info):
|
|
self._debug_counts["inbound_subscribe"] += 1
|
|
if service_name not in self._subscribers:
|
|
self._subscribers[service_name] = {}
|
|
subscribers = self._subscribers[service_name]
|
|
if subscriber in subscribers:
|
|
self.log("but they're already subscribed, ignoring",
|
|
level=log.UNUSUAL, umid="Sy9EfA")
|
|
return
|
|
|
|
if not subscriber_info: # for_v1
|
|
# v1 clients don't provide subscriber_info, but they should
|
|
# publish a 'stub client' record which contains the same
|
|
# information. If we've already received this, it will be in
|
|
# self._stub_client_announcements
|
|
tubid = subscriber.getRemoteTubID()
|
|
if tubid in self._stub_client_announcements:
|
|
subscriber_info = self._stub_client_announcements[tubid]
|
|
|
|
subscribers[subscriber] = (subscriber_info, time.time())
|
|
def _remove():
|
|
self.log("introducer: unsubscribing[%s] %s" % (service_name,
|
|
subscriber),
|
|
umid="vYGcJg")
|
|
subscribers.pop(subscriber, None)
|
|
subscriber.notifyOnDisconnect(_remove)
|
|
|
|
# now tell them about any announcements they're interested in
|
|
announcements = set( [ ann_t
|
|
for idx,(ann_t,canary,ann,when)
|
|
in self._announcements.items()
|
|
if idx[0] == service_name] )
|
|
if announcements:
|
|
self._debug_counts["outbound_message"] += 1
|
|
self._debug_counts["outbound_announcements"] += len(announcements)
|
|
self._debug_outstanding += 1
|
|
d = subscriber.callRemote("announce_v2", announcements)
|
|
d.addBoth(self._debug_retired)
|
|
d.addErrback(log.err,
|
|
format="subscriber errored during subscribe %(anns)s",
|
|
anns=announcements, facility="tahoe.introducer",
|
|
level=log.UNUSUAL, umid="mtZepQ")
|
|
return d
|