break introducer up into separate modules in the new allmydata.introducer package

This commit is contained in:
Brian Warner 2008-06-18 12:24:16 -07:00
parent 5bdff74e5b
commit 28f4652b96
8 changed files with 135 additions and 108 deletions

View File

@ -17,7 +17,7 @@ from allmydata.download import Downloader
from allmydata.checker import Checker
from allmydata.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer import IntroducerClient
from allmydata.introducer.client import IntroducerClient
from allmydata.util import hashutil, base32, testutil
from allmydata.filenode import FileNode
from allmydata.dirnode import NewDirectoryNode

View File

@ -0,0 +1,9 @@
# This is for compatibilty with old .tac files, which reference
# allmydata.introducer.IntroducerNode
from server import IntroducerNode
# hush pyflakes
_unused = [IntroducerNode]
del _unused

View File

@ -1,112 +1,12 @@
import re, time, sha, os.path
import re, time, sha
from base64 import b32decode
from zope.interface import implements
from twisted.application import service
from foolscap import Referenceable
from allmydata import node
from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService, \
RIIntroducerSubscriberClient, IIntroducerClient
from allmydata.interfaces import RIIntroducerSubscriberClient, IIntroducerClient
from allmydata.util import log, idlib
class IntroducerNode(node.Node):
PORTNUMFILE = "introducer.port"
NODETYPE = "introducer"
def __init__(self, basedir="."):
node.Node.__init__(self, basedir)
self.init_introducer()
webport = self.get_config("webport")
if webport:
self.init_web(webport) # strports string
def init_introducer(self):
introducerservice = IntroducerService(self.basedir)
self.add_service(introducerservice)
d = self.when_tub_ready()
def _publish(res):
self.introducer_url = self.tub.registerReference(introducerservice,
"introducer")
self.log(" introducer is at %s" % self.introducer_url)
self.write_config("introducer.furl", self.introducer_url + "\n")
d.addCallback(_publish)
d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
def init_web(self, webport):
self.log("init_web(webport=%s)", args=(webport,))
from allmydata.webish import IntroducerWebishServer
nodeurl_path = os.path.join(self.basedir, "node.url")
ws = IntroducerWebishServer(webport, nodeurl_path)
self.add_service(ws)
def make_index(announcement):
(furl, service_name, ri_name, nickname, ver, oldest) = announcement
m = re.match(r'pb://(\w+)@', furl)
assert m
nodeid = b32decode(m.group(1).upper())
return (nodeid, service_name)
class IntroducerService(service.MultiService, Referenceable):
implements(RIIntroducerPublisherAndSubscriberService)
name = "introducer"
def __init__(self, basedir="."):
service.MultiService.__init__(self)
self.introducer_url = None
# 'index' is (tubid, service_name)
self._announcements = {} # dict of index -> (announcement, timestamp)
self._subscribers = {} # dict of (rref->timestamp) dicts
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer"
return log.msg(*args, **kwargs)
def get_announcements(self):
return self._announcements
def get_subscribers(self):
return self._subscribers
def remote_publish(self, announcement):
self.log("introducer: announcement published: %s" % (announcement,) )
index = make_index(announcement)
if index in self._announcements:
(old_announcement, timestamp) = self._announcements[index]
if old_announcement == announcement:
self.log("but we already knew it, ignoring", level=log.NOISY)
return
else:
self.log("old announcement being updated", level=log.NOISY)
self._announcements[index] = (announcement, time.time())
(furl, service_name, ri_name, nickname, ver, oldest) = announcement
for s in self._subscribers.get(service_name, []):
s.callRemote("announce", set([announcement]))
def remote_subscribe(self, subscriber, service_name):
self.log("introducer: subscription[%s] request at %s" % (service_name,
subscriber))
if service_name not in self._subscribers:
self._subscribers[service_name] = {}
subscribers = self._subscribers[service_name]
if subscriber in subscribers:
self.log("but they're already subscribed, ignoring",
level=log.UNUSUAL)
return
subscribers[subscriber] = time.time()
def _remove():
self.log("introducer: unsubscribing[%s] %s" % (service_name,
subscriber))
subscribers.pop(subscriber, None)
subscriber.notifyOnDisconnect(_remove)
announcements = set( [ ann
for idx,(ann,when) in self._announcements.items()
if idx[1] == service_name] )
d = subscriber.callRemote("announce", announcements)
d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
from allmydata.introducer.common import make_index
class RemoteServiceConnector:

View File

@ -0,0 +1,11 @@
import re
from base64 import b32decode
def make_index(announcement):
(furl, service_name, ri_name, nickname, ver, oldest) = announcement
m = re.match(r'pb://(\w+)@', furl)
assert m
nodeid = b32decode(m.group(1).upper())
return (nodeid, service_name)

View File

@ -0,0 +1,103 @@
import time, os.path
from zope.interface import implements
from twisted.application import service
from foolscap import Referenceable
from allmydata import node
from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService
from allmydata.util import log
from allmydata.introducer.common import make_index
class IntroducerNode(node.Node):
PORTNUMFILE = "introducer.port"
NODETYPE = "introducer"
def __init__(self, basedir="."):
node.Node.__init__(self, basedir)
self.init_introducer()
webport = self.get_config("webport")
if webport:
self.init_web(webport) # strports string
def init_introducer(self):
introducerservice = IntroducerService(self.basedir)
self.add_service(introducerservice)
d = self.when_tub_ready()
def _publish(res):
self.introducer_url = self.tub.registerReference(introducerservice,
"introducer")
self.log(" introducer is at %s" % self.introducer_url)
self.write_config("introducer.furl", self.introducer_url + "\n")
d.addCallback(_publish)
d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
def init_web(self, webport):
self.log("init_web(webport=%s)", args=(webport,))
from allmydata.webish import IntroducerWebishServer
nodeurl_path = os.path.join(self.basedir, "node.url")
ws = IntroducerWebishServer(webport, nodeurl_path)
self.add_service(ws)
class IntroducerService(service.MultiService, Referenceable):
implements(RIIntroducerPublisherAndSubscriberService)
name = "introducer"
def __init__(self, basedir="."):
service.MultiService.__init__(self)
self.introducer_url = None
# 'index' is (tubid, service_name)
self._announcements = {} # dict of index -> (announcement, timestamp)
self._subscribers = {} # dict of (rref->timestamp) dicts
def log(self, *args, **kwargs):
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.introducer"
return log.msg(*args, **kwargs)
def get_announcements(self):
return self._announcements
def get_subscribers(self):
return self._subscribers
def remote_publish(self, announcement):
self.log("introducer: announcement published: %s" % (announcement,) )
index = make_index(announcement)
if index in self._announcements:
(old_announcement, timestamp) = self._announcements[index]
if old_announcement == announcement:
self.log("but we already knew it, ignoring", level=log.NOISY)
return
else:
self.log("old announcement being updated", level=log.NOISY)
self._announcements[index] = (announcement, time.time())
(furl, service_name, ri_name, nickname, ver, oldest) = announcement
for s in self._subscribers.get(service_name, []):
s.callRemote("announce", set([announcement]))
def remote_subscribe(self, subscriber, service_name):
self.log("introducer: subscription[%s] request at %s" % (service_name,
subscriber))
if service_name not in self._subscribers:
self._subscribers[service_name] = {}
subscribers = self._subscribers[service_name]
if subscriber in subscribers:
self.log("but they're already subscribed, ignoring",
level=log.UNUSUAL)
return
subscribers[subscriber] = time.time()
def _remove():
self.log("introducer: unsubscribing[%s] %s" % (service_name,
subscriber))
subscribers.pop(subscriber, None)
subscriber.notifyOnDisconnect(_remove)
announcements = set( [ ann
for idx,(ann,when) in self._announcements.items()
if idx[1] == service_name] )
d = subscriber.callRemote("announce", announcements)
d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)

View File

@ -5,11 +5,12 @@ from twisted.application import service
from twisted.python import log
import allmydata
from allmydata import client, introducer
from allmydata import client
from allmydata.introducer.client import IntroducerClient
from allmydata.util import base32, testutil
from foolscap.eventual import flushEventualQueue
class FakeIntroducerClient(introducer.IntroducerClient):
class FakeIntroducerClient(IntroducerClient):
def __init__(self):
self._connections = set()
def add_peer(self, nodeid):

View File

@ -9,7 +9,10 @@ from twisted.python import log
from foolscap import Tub, Referenceable
from foolscap.eventual import fireEventually, flushEventualQueue
from twisted.application import service
from allmydata.introducer import IntroducerClient, IntroducerService, IntroducerNode
from allmydata.introducer.client import IntroducerClient
from allmydata.introducer.server import IntroducerService
# test compatibility with old introducer .tac files
from allmydata.introducer import IntroducerNode
from allmydata.util import testutil, idlib
class FakeNode(Referenceable):

View File

@ -9,7 +9,7 @@ from twisted.internet.error import ConnectionDone, ConnectionLost
from twisted.application import service
import allmydata
from allmydata import client, uri, download, upload, storage, offloaded
from allmydata.introducer import IntroducerNode
from allmydata.introducer.server import IntroducerNode
from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
from allmydata.util import log
from allmydata.scripts import runner, cli