mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-21 10:01:54 +00:00
complete the Introducer changes, separate out vdrive access, make everything work again
This commit is contained in:
parent
ad52a1bf9a
commit
25ff9e1f97
@ -1,21 +1,19 @@
|
||||
|
||||
import os, sha
|
||||
from foolscap import Referenceable
|
||||
from twisted.application import service
|
||||
from twisted.python import log
|
||||
from zope.interface import implements
|
||||
from allmydata.interfaces import RIClient
|
||||
from allmydata import node
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from allmydata.util import idlib
|
||||
from allmydata.storageserver import StorageServer
|
||||
from allmydata.upload import Uploader
|
||||
from allmydata.download import Downloader
|
||||
from allmydata.vdrive import VDrive
|
||||
from allmydata.webish import WebishServer
|
||||
from allmydata.control import ControlServer
|
||||
from allmydata.introducer import IntroducerClient
|
||||
|
||||
class Client(node.Node, Referenceable):
|
||||
implements(RIClient)
|
||||
@ -24,12 +22,14 @@ class Client(node.Node, Referenceable):
|
||||
STOREDIR = 'storage'
|
||||
NODETYPE = "client"
|
||||
WEBPORTFILE = "webport"
|
||||
QUEEN_PBURL_FILE = "roster_pburl"
|
||||
INTRODUCER_FURL_FILE = "introducer.furl"
|
||||
GLOBAL_VDRIVE_FURL_FILE = "vdrive.furl"
|
||||
|
||||
def __init__(self, basedir="."):
|
||||
node.Node.__init__(self, basedir)
|
||||
self.queen = None # self.queen is either None or a RemoteReference
|
||||
self.my_pburl = None
|
||||
self.introducer_client = None
|
||||
self.connected_to_vdrive = False
|
||||
self.add_service(StorageServer(os.path.join(basedir, self.STOREDIR)))
|
||||
self.add_service(Uploader())
|
||||
self.add_service(Downloader())
|
||||
@ -40,37 +40,31 @@ class Client(node.Node, Referenceable):
|
||||
webport = f.read() # strports string
|
||||
f.close()
|
||||
self.add_service(WebishServer(webport))
|
||||
self.queen_pburl = None
|
||||
QUEEN_PBURL_FILE = os.path.join(self.basedir, self.QUEEN_PBURL_FILE)
|
||||
if os.path.exists(QUEEN_PBURL_FILE):
|
||||
f = open(QUEEN_PBURL_FILE, "r")
|
||||
self.queen_pburl = f.read().strip()
|
||||
f.close()
|
||||
self.queen_connector = None
|
||||
|
||||
INTRODUCER_FURL_FILE = os.path.join(self.basedir,
|
||||
self.INTRODUCER_FURL_FILE)
|
||||
f = open(INTRODUCER_FURL_FILE, "r")
|
||||
self.introducer_furl = f.read().strip()
|
||||
f.close()
|
||||
|
||||
GLOBAL_VDRIVE_FURL_FILE = os.path.join(self.basedir,
|
||||
self.GLOBAL_VDRIVE_FURL_FILE)
|
||||
f = open(GLOBAL_VDRIVE_FURL_FILE, "r")
|
||||
self.global_vdrive_furl = f.read().strip()
|
||||
f.close()
|
||||
|
||||
def tub_ready(self):
|
||||
self.log("tub_ready")
|
||||
self.my_pburl = self.tub.registerReference(self)
|
||||
if self.queen_pburl:
|
||||
self.introducer_client = IntroducerClient(self.tub, self.queen_pburl, self.my_pburl)
|
||||
|
||||
ic = IntroducerClient(self.tub, self.introducer_furl, self.my_pburl)
|
||||
self.introducer_client = ic
|
||||
ic.setServiceParent(self)
|
||||
|
||||
self.register_control()
|
||||
self.maybe_connect_to_queen()
|
||||
|
||||
def set_queen_pburl(self, queen_pburl):
|
||||
self.queen_pburl = queen_pburl
|
||||
self.maybe_connect_to_queen()
|
||||
|
||||
def maybe_connect_to_queen(self):
|
||||
if not self.running:
|
||||
return
|
||||
if not self.my_pburl:
|
||||
return
|
||||
if self.queen_connector:
|
||||
return
|
||||
if not self.queen_pburl:
|
||||
self.log("no queen_pburl, cannot connect")
|
||||
return
|
||||
self.queen_connector = self.tub.connectTo(self.queen_pburl,
|
||||
self._got_queen)
|
||||
self.vdrive_connector = self.tub.connectTo(self.global_vdrive_furl,
|
||||
self._got_vdrive)
|
||||
|
||||
def register_control(self):
|
||||
c = ControlServer()
|
||||
@ -81,38 +75,37 @@ class Client(node.Node, Referenceable):
|
||||
f.close()
|
||||
os.chmod("control.pburl", 0600)
|
||||
|
||||
def stopService(self):
|
||||
if self.introducer_client:
|
||||
self.introducer_client.stop()
|
||||
return service.MultiService.stopService(self)
|
||||
|
||||
def _got_queen(self, queen):
|
||||
self.log("connected to queen")
|
||||
d.addCallback(lambda x: queen.callRemote("get_global_vdrive"))
|
||||
d.addCallback(self._got_vdrive_root)
|
||||
|
||||
def _got_vdrive_root(self, root):
|
||||
self.getServiceNamed("vdrive").set_root(root)
|
||||
def _got_vdrive(self, vdrive_root):
|
||||
# vdrive_root implements RIMutableDirectoryNode
|
||||
self.log("connected to vdrive")
|
||||
self.connected_to_vdrive = True
|
||||
self.getServiceNamed("vdrive").set_root(vdrive_root)
|
||||
if "webish" in self.namedServices:
|
||||
self.getServiceNamed("webish").set_root_dirnode(root)
|
||||
self.getServiceNamed("webish").set_root_dirnode(vdrive_root)
|
||||
def _disconnected():
|
||||
self.connected_to_vdrive = False
|
||||
vdrive_root.notifyOnDisconnect(_disconnected)
|
||||
|
||||
def remote_get_service(self, name):
|
||||
# TODO: 'vdrive' should not be public in the medium term
|
||||
return self.getServiceNamed(name)
|
||||
|
||||
def get_remote_service(self, nodeid, servicename):
|
||||
if nodeid not in self.connections:
|
||||
if nodeid not in self.introducer_client.connections:
|
||||
return defer.fail(IndexError("no connection to that peer"))
|
||||
peer = self.connections[nodeid]
|
||||
peer = self.introducer_client.connections[nodeid]
|
||||
d = peer.callRemote("get_service", name=servicename)
|
||||
return d
|
||||
|
||||
|
||||
def get_all_peerids(self):
|
||||
return self.introducer_client.connections.iterkeys()
|
||||
|
||||
def permute_peerids(self, key, max_count=None):
|
||||
# TODO: eventually reduce memory consumption by doing an insertion
|
||||
# sort of at most max_count elements
|
||||
results = []
|
||||
for nodeid in self.all_peers:
|
||||
for nodeid in self.get_all_peerids():
|
||||
assert isinstance(nodeid, str)
|
||||
permuted = sha.new(key + nodeid).digest()
|
||||
results.append((permuted, nodeid))
|
||||
|
@ -1,6 +1,6 @@
|
||||
|
||||
from zope.interface import Interface
|
||||
from foolscap.schema import StringConstraint, ListOf, SetOf, TupleOf, Any
|
||||
from foolscap.schema import StringConstraint, ListOf, TupleOf, Any
|
||||
from foolscap import RemoteInterface
|
||||
|
||||
Nodeid = StringConstraint(20) # binary format 20-byte SHA1 hash
|
||||
@ -16,6 +16,8 @@ RIBucketWriter_ = Any()
|
||||
RIBucketReader_ = Any()
|
||||
RIMutableDirectoryNode_ = Any()
|
||||
RIMutableFileNode_ = Any()
|
||||
def SetOf(*args, **kwargs): return Any()
|
||||
def DictOf(*args, **kwargs): return Any()
|
||||
|
||||
class RIIntroducerClient(RemoteInterface):
|
||||
def new_peers(pburls=SetOf(PBURL)):
|
||||
@ -25,11 +27,7 @@ class RIIntroducer(RemoteInterface):
|
||||
def hello(node=RIIntroducerClient, pburl=PBURL):
|
||||
return None
|
||||
|
||||
class RIQueenRoster(RemoteInterface):
|
||||
def get_global_vdrive():
|
||||
return RIMutableDirectoryNode_ # the virtual drive root
|
||||
|
||||
class RIClient(RIIntroducerClient):
|
||||
class RIClient(RemoteInterface):
|
||||
def get_service(name=str):
|
||||
return Referenceable_
|
||||
def get_nodeid():
|
||||
|
@ -1,16 +1,11 @@
|
||||
from foolscap import Referenceable, DeadReferenceError
|
||||
|
||||
import re
|
||||
from zope.interface import implements
|
||||
from twisted.application import service
|
||||
from twisted.python import log
|
||||
from twisted.internet.error import ConnectionLost, ConnectionDone
|
||||
from zope.interface import implements
|
||||
from allmydata.interfaces import RIIntroducer
|
||||
|
||||
|
||||
def sendOnly(call, methname, *args, **kwargs):
|
||||
d = call(methname, *args, **kwargs)
|
||||
def _trap(f):
|
||||
f.trap(DeadReferenceError, ConnectionLost, ConnectionDone)
|
||||
d.addErrback(_trap)
|
||||
from foolscap import Referenceable
|
||||
from allmydata.interfaces import RIIntroducer, RIIntroducerClient
|
||||
from allmydata.util import idlib, observer
|
||||
|
||||
class Introducer(service.MultiService, Referenceable):
|
||||
implements(RIIntroducer)
|
||||
@ -21,9 +16,9 @@ class Introducer(service.MultiService, Referenceable):
|
||||
self.pburls = set()
|
||||
|
||||
def remote_hello(self, node, pburl):
|
||||
log.msg("roster: new contact at %s, node is %s" % (pburl, node))
|
||||
log.msg("introducer: new contact at %s, node is %s" % (pburl, node))
|
||||
def _remove():
|
||||
log.msg(" roster: removing %s %s" % (node, pburl))
|
||||
log.msg(" introducer: removing %s %s" % (node, pburl))
|
||||
self.nodes.remove(node)
|
||||
self.pburls.remove(pburl)
|
||||
node.notifyOnDisconnect(_remove)
|
||||
@ -32,3 +27,64 @@ class Introducer(service.MultiService, Referenceable):
|
||||
for othernode in self.nodes:
|
||||
othernode.callRemote("new_peers", set([pburl]))
|
||||
self.nodes.add(node)
|
||||
|
||||
|
||||
class IntroducerClient(service.Service, Referenceable):
|
||||
implements(RIIntroducerClient)
|
||||
|
||||
def __init__(self, tub, introducer_pburl, my_pburl):
|
||||
self.tub = tub
|
||||
self.introducer_pburl = introducer_pburl
|
||||
self.my_pburl = my_pburl
|
||||
|
||||
self.connections = {} # k: nodeid, v: ref
|
||||
self.reconnectors = {} # k: PBURL, v: reconnector
|
||||
|
||||
self.connection_observers = observer.ObserverList()
|
||||
|
||||
def startService(self):
|
||||
self.introducer_reconnector = self.tub.connectTo(self.introducer_pburl,
|
||||
self._got_introducer)
|
||||
|
||||
def log(self, msg):
|
||||
self.parent.log(msg)
|
||||
|
||||
def remote_new_peers(self, pburls):
|
||||
for pburl in pburls:
|
||||
self._new_peer(pburl)
|
||||
|
||||
def stopService(self):
|
||||
service.Service.stopService(self)
|
||||
self.introducer_reconnector.stopConnecting()
|
||||
for reconnector in self.reconnectors.itervalues():
|
||||
reconnector.stopConnecting()
|
||||
|
||||
def _new_peer(self, pburl):
|
||||
if pburl in self.reconnectors:
|
||||
return
|
||||
m = re.match(r'pb://(\w+)@', pburl)
|
||||
assert m
|
||||
nodeid = idlib.a2b(m.group(1))
|
||||
def _got_peer(rref):
|
||||
self.log(" connected to(%s)" % idlib.b2a(nodeid))
|
||||
self.connection_observers.notify(nodeid, rref)
|
||||
self.connections[nodeid] = rref
|
||||
def _lost():
|
||||
# TODO: notifyOnDisconnect uses eventually(), but connects do
|
||||
# not. Could this cause a problem?
|
||||
del self.connections[nodeid]
|
||||
rref.notifyOnDisconnect(_lost)
|
||||
self.log(" connecting to(%s)" % pburl)
|
||||
self.reconnectors[pburl] = self.tub.connectTo(pburl, _got_peer)
|
||||
|
||||
def _got_introducer(self, introducer):
|
||||
self.log(" introducing ourselves: %s, %s" % (self, self.my_pburl))
|
||||
d = introducer.callRemote("hello",
|
||||
node=self,
|
||||
pburl=self.my_pburl)
|
||||
|
||||
def notify_on_new_connection(self, cb):
|
||||
"""Register a callback that will be fired (with nodeid, rref) when
|
||||
a new connection is established."""
|
||||
self.connection_observers.subscribe(cb)
|
||||
|
||||
|
@ -1,45 +0,0 @@
|
||||
class IntroducerClient(Referenceable):
|
||||
implements(RIIntroducerClient)
|
||||
|
||||
def __init__(self, tub, introducer_pburl, my_pburl):
|
||||
self.introducer_reconnector = self.tub.connectTo(introducer_pburl,
|
||||
self._got_introducer)
|
||||
|
||||
self.tub = tub
|
||||
self.my_pburl = my_pburl
|
||||
|
||||
self.connections = {} # k: nodeid, v: ref
|
||||
self.reconnectors = {} # k: PBURL, v: reconnector
|
||||
|
||||
def remote_get_nodeid(self):
|
||||
return self.nodeid
|
||||
|
||||
def remote_new_peers(self, pburls):
|
||||
for pburl in pburls:
|
||||
self._new_peer(pburl)
|
||||
|
||||
def stop(self):
|
||||
self.introducer_reconnector.stopConnecting()
|
||||
for reconnector in self.reconnectors.itervalues():
|
||||
reconnector.stopConnecting()
|
||||
|
||||
def _new_peer(self, pburl):
|
||||
if pburl in self.reconnectors:
|
||||
return
|
||||
def _got_peer(rref):
|
||||
d2 = rref.callRemote("get_nodeid")
|
||||
def _got_nodeid(nodeid):
|
||||
self.connections[nodeid] = rref
|
||||
def _lost():
|
||||
# TODO: notifyOnDisconnect uses eventually(), but connects do not. Could this cause a problem?
|
||||
del self.connections[nodeid]
|
||||
rref.notifyOnDisconnect(_lost)
|
||||
d2.addCallback(_got_nodeid)
|
||||
log.msg(" connecting to(%s)" % pburl)
|
||||
self.reconnectors[pburl] = self.tub.connectTo(pburl, _got_peer)
|
||||
|
||||
def _got_introducer(self, introducer):
|
||||
log.msg(" introducing ourselves: %s, %s" % (self, self.my_pburl))
|
||||
d = introducer.callRemote("hello",
|
||||
node=self,
|
||||
pburl=self.my_pburl)
|
@ -30,6 +30,9 @@ class Node(service.MultiService):
|
||||
f = open(certfile, "wb")
|
||||
f.write(self.tub.getCertData())
|
||||
f.close()
|
||||
if False: # TODO: once foolscap-0.1.1 is released, enable this
|
||||
self.tub.setOption("logLocalFailures", True)
|
||||
self.tub.setOption("logRemoteFailures", True)
|
||||
self.nodeid = idlib.a2b(self.tub.tubID)
|
||||
f = open(os.path.join(self.basedir, self.NODEIDFILE), "w")
|
||||
f.write(idlib.b2a(self.nodeid) + "\n")
|
||||
|
@ -1,35 +1,8 @@
|
||||
|
||||
import os.path
|
||||
from foolscap import Referenceable, DeadReferenceError
|
||||
from foolscap.eventual import eventually
|
||||
from twisted.application import service
|
||||
from twisted.python import log
|
||||
from twisted.internet.error import ConnectionLost, ConnectionDone
|
||||
from allmydata.util import idlib
|
||||
from zope.interface import implements
|
||||
from allmydata.interfaces import RIQueenRoster, RIIntroducer
|
||||
from allmydata import node
|
||||
from allmydata.filetable import GlobalVirtualDrive
|
||||
|
||||
|
||||
def sendOnly(call, methname, *args, **kwargs):
|
||||
d = call(methname, *args, **kwargs)
|
||||
def _trap(f):
|
||||
f.trap(DeadReferenceError, ConnectionLost, ConnectionDone)
|
||||
d.addErrback(_trap)
|
||||
|
||||
class Roster(service.MultiService, Referenceable):
|
||||
implements(RIQueenRoster)
|
||||
|
||||
def __init__(self):
|
||||
self.gvd_root = None
|
||||
|
||||
def set_gvd_root(self, root):
|
||||
self.gvd_root = root
|
||||
|
||||
def remote_get_global_vdrive(self):
|
||||
return self.gvd_root
|
||||
|
||||
from allmydata.introducer import Introducer
|
||||
|
||||
|
||||
class Queen(node.Node):
|
||||
@ -39,15 +12,21 @@ class Queen(node.Node):
|
||||
|
||||
def __init__(self, basedir="."):
|
||||
node.Node.__init__(self, basedir)
|
||||
self.gvd = self.add_service(GlobalVirtualDrive(basedir))
|
||||
self.urls = {}
|
||||
|
||||
def tub_ready(self):
|
||||
r = self.add_service(Roster())
|
||||
self.urls["roster"] = self.tub.registerReference(r, "roster")
|
||||
self.log(" roster is at %s" % self.urls["roster"])
|
||||
f = open(os.path.join(self.basedir, "roster_pburl"), "w")
|
||||
f.write(self.urls["roster"] + "\n")
|
||||
r = self.add_service(Introducer())
|
||||
self.urls["introducer"] = self.tub.registerReference(r, "introducer")
|
||||
self.log(" introducer is at %s" % self.urls["introducer"])
|
||||
f = open(os.path.join(self.basedir, "introducer.furl"), "w")
|
||||
f.write(self.urls["introducer"] + "\n")
|
||||
f.close()
|
||||
|
||||
gvd = self.add_service(GlobalVirtualDrive(self.basedir))
|
||||
self.urls["vdrive"] = self.tub.registerReference(gvd.get_root(),
|
||||
"vdrive")
|
||||
self.log(" vdrive is at %s" % self.urls["vdrive"])
|
||||
f = open(os.path.join(self.basedir, "vdrive.furl"), "w")
|
||||
f.write(self.urls["vdrive"] + "\n")
|
||||
f.close()
|
||||
r.set_gvd_root(self.gvd.get_root())
|
||||
|
||||
|
@ -122,7 +122,8 @@ def create_client(config):
|
||||
f = open(os.path.join(basedir, "client.tac"), "w")
|
||||
f.write(client_tac)
|
||||
f.close()
|
||||
print "client created in %s, please copy roster_pburl into the directory" % basedir
|
||||
print "client created in %s" % basedir
|
||||
print " please copy introducer.furl and vdrive.furl into the directory"
|
||||
|
||||
def create_queen(config):
|
||||
basedir = config['basedir']
|
||||
|
@ -65,13 +65,19 @@ class SystemFramework:
|
||||
|
||||
def make_nodes(self):
|
||||
q = self.queen
|
||||
self.queen_pburl = q.urls["roster"]
|
||||
self.queen_pburl = q.urls["introducer"]
|
||||
vdrive_furl = q.urls["vdrive"]
|
||||
self.nodes = []
|
||||
for i in range(self.numnodes):
|
||||
nodedir = os.path.join(self.basedir, "node%d" % i)
|
||||
os.mkdir(nodedir)
|
||||
f = open(os.path.join(nodedir, "introducer.furl"), "w")
|
||||
f.write(self.queen_pburl)
|
||||
f.close()
|
||||
f = open(os.path.join(nodedir, "vdrive.furl"), "w")
|
||||
f.write(vdrive_furl)
|
||||
f.close()
|
||||
c = self.add_service(client.Client(basedir=nodedir))
|
||||
c.set_queen_pburl(self.queen_pburl)
|
||||
self.nodes.append(c)
|
||||
# the peers will start running, eventually they will connect to each
|
||||
# other and the queen
|
||||
@ -81,7 +87,7 @@ class SystemFramework:
|
||||
f.write("If the node notices this file at startup, it will poll and\n")
|
||||
f.write("terminate as soon as the file goes away. This prevents\n")
|
||||
f.write("leaving processes around if the test harness has an\n")
|
||||
f.write("internal failure and neglects to kil off the node\n")
|
||||
f.write("internal failure and neglects to kill off the node\n")
|
||||
f.write("itself. The contents of this file are ignored.\n")
|
||||
f.close()
|
||||
|
||||
@ -91,7 +97,7 @@ class SystemFramework:
|
||||
config = {'basedir': clientdir}
|
||||
runner.create_client(config)
|
||||
log.msg("DONE MAKING CLIENT")
|
||||
f = open(os.path.join(clientdir, "roster_pburl"), "w")
|
||||
f = open(os.path.join(clientdir, "introducer.furl"), "w")
|
||||
f.write(self.queen_pburl + "\n")
|
||||
f.close()
|
||||
self.keepalive_file = os.path.join(clientdir, "suicide_prevention_hotline")
|
||||
|
@ -1,17 +1,31 @@
|
||||
|
||||
import os
|
||||
from twisted.trial import unittest
|
||||
|
||||
from allmydata import client
|
||||
|
||||
class MyClient(client.Client):
|
||||
def __init__(self, basedir):
|
||||
self.connections = {}
|
||||
client.Client.__init__(self, basedir)
|
||||
|
||||
def get_all_peerids(self):
|
||||
return self.connections
|
||||
|
||||
class Basic(unittest.TestCase):
|
||||
def test_loadable(self):
|
||||
c = client.Client("")
|
||||
d = c.startService()
|
||||
d.addCallback(lambda res: c.stopService())
|
||||
return d
|
||||
basedir = "test_client.Basic.test_loadable"
|
||||
os.mkdir(basedir)
|
||||
open(os.path.join(basedir, "introducer.furl"), "w").write("")
|
||||
open(os.path.join(basedir, "vdrive.furl"), "w").write("")
|
||||
c = client.Client(basedir)
|
||||
|
||||
def test_permute(self):
|
||||
c = client.Client("")
|
||||
basedir = "test_client.Basic.test_permute"
|
||||
os.mkdir(basedir)
|
||||
open(os.path.join(basedir, "introducer.furl"), "w").write("")
|
||||
open(os.path.join(basedir, "vdrive.furl"), "w").write("")
|
||||
c = MyClient(basedir)
|
||||
for k in ["%d" % i for i in range(5)]:
|
||||
c.connections[k] = None
|
||||
self.failUnlessEqual(c.permute_peerids("one"), ['3','1','0','4','2'])
|
||||
@ -20,7 +34,7 @@ class Basic(unittest.TestCase):
|
||||
c.connections.clear()
|
||||
self.failUnlessEqual(c.permute_peerids("one"), [])
|
||||
|
||||
c2 = client.Client("")
|
||||
c2 = MyClient(basedir)
|
||||
for k in ["%d" % i for i in range(5)]:
|
||||
c2.connections[k] = None
|
||||
self.failUnlessEqual(c2.permute_peerids("one"), ['3','1','0','4','2'])
|
||||
|
234
src/allmydata/test/test_introducer.py
Normal file
234
src/allmydata/test/test_introducer.py
Normal file
@ -0,0 +1,234 @@
|
||||
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.python import log
|
||||
defer.setDebugging(True)
|
||||
|
||||
from foolscap import Tub, Referenceable
|
||||
from twisted.application import service
|
||||
from allmydata.introducer import IntroducerClient, Introducer
|
||||
from allmydata.util import idlib
|
||||
|
||||
class MyNode(Referenceable):
|
||||
pass
|
||||
|
||||
class LoggingMultiService(service.MultiService):
|
||||
def log(self, msg):
|
||||
pass
|
||||
|
||||
class TestIntroducer(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.parent = LoggingMultiService()
|
||||
self.parent.startService()
|
||||
def tearDown(self):
|
||||
log.msg("TestIntroducer.tearDown")
|
||||
d = defer.Deferred()
|
||||
reactor.callLater(1.1, d.callback, None)
|
||||
d.addCallback(lambda res: self.parent.stopService())
|
||||
return d
|
||||
|
||||
|
||||
|
||||
def poll(self, check_f, pollinterval=0.01):
|
||||
# Return a Deferred, then call check_f periodically until it returns
|
||||
# True, at which point the Deferred will fire.. If check_f raises an
|
||||
# exception, the Deferred will errback.
|
||||
d = defer.maybeDeferred(self._poll, None, check_f, pollinterval)
|
||||
return d
|
||||
|
||||
def _poll(self, res, check_f, pollinterval):
|
||||
if check_f():
|
||||
return True
|
||||
d = defer.Deferred()
|
||||
d.addCallback(self._poll, check_f, pollinterval)
|
||||
reactor.callLater(pollinterval, d.callback, None)
|
||||
return d
|
||||
|
||||
|
||||
def test_create(self):
|
||||
ic = IntroducerClient(None, "introducer", "mypburl")
|
||||
def _ignore(nodeid, rref):
|
||||
pass
|
||||
ic.notify_on_new_connection(_ignore)
|
||||
|
||||
def test_listen(self):
|
||||
i = Introducer()
|
||||
i.setServiceParent(self.parent)
|
||||
|
||||
def test_system(self):
|
||||
|
||||
self.central_tub = tub = Tub()
|
||||
#tub.setOption("logLocalFailures", True)
|
||||
#tub.setOption("logRemoteFailures", True)
|
||||
tub.setServiceParent(self.parent)
|
||||
l = tub.listenOn("tcp:0")
|
||||
portnum = l.getPortnum()
|
||||
tub.setLocation("localhost:%d" % portnum)
|
||||
|
||||
i = Introducer()
|
||||
i.setServiceParent(self.parent)
|
||||
iurl = tub.registerReference(i)
|
||||
NUMCLIENTS = 5
|
||||
|
||||
self.waiting_for_connections = NUMCLIENTS*NUMCLIENTS
|
||||
d = self._done_counting = defer.Deferred()
|
||||
def _count(nodeid, rref):
|
||||
log.msg("NEW CONNECTION! %s %s" % (idlib.b2a(nodeid), rref))
|
||||
self.waiting_for_connections -= 1
|
||||
if self.waiting_for_connections == 0:
|
||||
self._done_counting.callback("done!")
|
||||
|
||||
clients = []
|
||||
tubs = {}
|
||||
for i in range(NUMCLIENTS):
|
||||
tub = Tub()
|
||||
#tub.setOption("logLocalFailures", True)
|
||||
#tub.setOption("logRemoteFailures", True)
|
||||
tub.setServiceParent(self.parent)
|
||||
l = tub.listenOn("tcp:0")
|
||||
portnum = l.getPortnum()
|
||||
tub.setLocation("localhost:%d" % portnum)
|
||||
|
||||
n = MyNode()
|
||||
node_pburl = tub.registerReference(n)
|
||||
c = IntroducerClient(tub, iurl, node_pburl)
|
||||
c.notify_on_new_connection(_count)
|
||||
c.setServiceParent(self.parent)
|
||||
clients.append(c)
|
||||
tubs[c] = tub
|
||||
|
||||
# d will fire once everybody is connected
|
||||
|
||||
def _check(res):
|
||||
log.msg("doing _check")
|
||||
for c in clients:
|
||||
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
|
||||
# now disconnect somebody's connection to someone else
|
||||
self.waiting_for_connections = 2
|
||||
d2 = self._done_counting = defer.Deferred()
|
||||
origin_c = clients[0]
|
||||
# find a target that is not themselves
|
||||
for nodeid,rref in origin_c.connections.items():
|
||||
if idlib.b2a(nodeid) != tubs[origin_c].tubID:
|
||||
victim = rref
|
||||
break
|
||||
log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
|
||||
victim.tracker.broker.transport.loseConnection()
|
||||
log.msg(" did disconnect")
|
||||
return d2
|
||||
d.addCallback(_check)
|
||||
def _check_again(res):
|
||||
log.msg("doing _check_again")
|
||||
for c in clients:
|
||||
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
|
||||
# now disconnect somebody's connection to themselves. This will
|
||||
# only result in one new connection, since it is a loopback.
|
||||
self.waiting_for_connections = 1
|
||||
d2 = self._done_counting = defer.Deferred()
|
||||
origin_c = clients[0]
|
||||
# find a target that *is* themselves
|
||||
for nodeid,rref in origin_c.connections.items():
|
||||
if idlib.b2a(nodeid) == tubs[origin_c].tubID:
|
||||
victim = rref
|
||||
break
|
||||
log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
|
||||
victim.tracker.broker.transport.loseConnection()
|
||||
log.msg(" did disconnect")
|
||||
return d2
|
||||
d.addCallback(_check_again)
|
||||
def _check_again2(res):
|
||||
log.msg("doing _check_again2")
|
||||
for c in clients:
|
||||
self.failUnlessEqual(len(c.connections), NUMCLIENTS)
|
||||
# now disconnect somebody's connection to themselves
|
||||
d.addCallback(_check_again2)
|
||||
return d
|
||||
|
||||
def stall(self, res, timeout):
|
||||
d = defer.Deferred()
|
||||
reactor.callLater(timeout, d.callback, res)
|
||||
return d
|
||||
|
||||
def test_system_this_one_breaks(self):
|
||||
# this uses a single Tub, which has a strong effect on the
|
||||
# failingness
|
||||
tub = Tub()
|
||||
tub.setOption("logLocalFailures", True)
|
||||
tub.setOption("logRemoteFailures", True)
|
||||
tub.setServiceParent(self.parent)
|
||||
l = tub.listenOn("tcp:0")
|
||||
portnum = l.getPortnum()
|
||||
tub.setLocation("localhost:%d" % portnum)
|
||||
|
||||
i = Introducer()
|
||||
i.setServiceParent(self.parent)
|
||||
iurl = tub.registerReference(i)
|
||||
|
||||
clients = []
|
||||
for i in range(5):
|
||||
n = MyNode()
|
||||
node_pburl = tub.registerReference(n)
|
||||
c = IntroducerClient(tub, iurl, node_pburl)
|
||||
c.setServiceParent(self.parent)
|
||||
clients.append(c)
|
||||
|
||||
# time passes..
|
||||
d = defer.Deferred()
|
||||
def _check(res):
|
||||
log.msg("doing _check")
|
||||
self.failUnlessEqual(len(clients[0].connections), 5)
|
||||
d.addCallback(_check)
|
||||
reactor.callLater(2, d.callback, None)
|
||||
return d
|
||||
del test_system_this_one_breaks
|
||||
|
||||
|
||||
def test_system_this_one_breaks_too(self):
|
||||
# this one shuts down so quickly that it fails in a different way
|
||||
self.central_tub = tub = Tub()
|
||||
tub.setOption("logLocalFailures", True)
|
||||
tub.setOption("logRemoteFailures", True)
|
||||
tub.setServiceParent(self.parent)
|
||||
l = tub.listenOn("tcp:0")
|
||||
portnum = l.getPortnum()
|
||||
tub.setLocation("localhost:%d" % portnum)
|
||||
|
||||
i = Introducer()
|
||||
i.setServiceParent(self.parent)
|
||||
iurl = tub.registerReference(i)
|
||||
|
||||
clients = []
|
||||
for i in range(5):
|
||||
tub = Tub()
|
||||
tub.setOption("logLocalFailures", True)
|
||||
tub.setOption("logRemoteFailures", True)
|
||||
tub.setServiceParent(self.parent)
|
||||
l = tub.listenOn("tcp:0")
|
||||
portnum = l.getPortnum()
|
||||
tub.setLocation("localhost:%d" % portnum)
|
||||
|
||||
n = MyNode()
|
||||
node_pburl = tub.registerReference(n)
|
||||
c = IntroducerClient(tub, iurl, node_pburl)
|
||||
c.setServiceParent(self.parent)
|
||||
clients.append(c)
|
||||
|
||||
# time passes..
|
||||
d = defer.Deferred()
|
||||
reactor.callLater(0.01, d.callback, None)
|
||||
def _check(res):
|
||||
log.msg("doing _check")
|
||||
self.fail("BOOM")
|
||||
for c in clients:
|
||||
self.failUnlessEqual(len(c.connections), 5)
|
||||
c.connections.values()[0].tracker.broker.transport.loseConnection()
|
||||
return self.stall(None, 2)
|
||||
d.addCallback(_check)
|
||||
def _check_again(res):
|
||||
log.msg("doing _check_again")
|
||||
for c in clients:
|
||||
self.failUnlessEqual(len(c.connections), 5)
|
||||
d.addCallback(_check_again)
|
||||
return d
|
||||
del test_system_this_one_breaks_too
|
||||
|
@ -47,18 +47,18 @@ class SystemTest(unittest.TestCase):
|
||||
|
||||
def _set_up_nodes_2(self, res):
|
||||
q = self.queen
|
||||
self.queen_pburl = q.urls["roster"]
|
||||
self.queen_furl = q.urls["introducer"]
|
||||
self.vdrive_furl = q.urls["vdrive"]
|
||||
self.clients = []
|
||||
for i in range(self.numclients):
|
||||
basedir = "client%d" % i
|
||||
if not os.path.isdir(basedir):
|
||||
os.mkdir(basedir)
|
||||
if i == 0:
|
||||
f = open(os.path.join(basedir, "webport"), "w")
|
||||
f.write("tcp:0:interface=127.0.0.1")
|
||||
f.close()
|
||||
open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
|
||||
open(os.path.join(basedir, "introducer.furl"), "w").write(self.queen_furl)
|
||||
open(os.path.join(basedir, "vdrive.furl"), "w").write(self.vdrive_furl)
|
||||
c = self.add_service(client.Client(basedir=basedir))
|
||||
c.set_queen_pburl(self.queen_pburl)
|
||||
self.clients.append(c)
|
||||
log.msg("STARTING")
|
||||
d = self.wait_for_connections()
|
||||
@ -76,9 +76,11 @@ class SystemTest(unittest.TestCase):
|
||||
basedir = "client%d" % client_num
|
||||
if not os.path.isdir(basedir):
|
||||
os.mkdir(basedir)
|
||||
open(os.path.join(basedir, "introducer.furl"), "w").write(self.queen_furl)
|
||||
open(os.path.join(basedir, "vdrive.furl"), "w").write(self.vdrive_furl)
|
||||
|
||||
c = client.Client(basedir=basedir)
|
||||
self.clients.append(c)
|
||||
c.set_queen_pburl(self.queen_pburl)
|
||||
self.numclients += 1
|
||||
c.startService()
|
||||
d = self.wait_for_connections()
|
||||
@ -87,7 +89,7 @@ class SystemTest(unittest.TestCase):
|
||||
|
||||
def wait_for_connections(self, ignored=None):
|
||||
for c in self.clients:
|
||||
if len(c.connections) != self.numclients:
|
||||
if not c.introducer_client or len(c.get_all_peerids()) != self.numclients:
|
||||
d = defer.Deferred()
|
||||
d.addCallback(self.wait_for_connections)
|
||||
reactor.callLater(0.05, d.callback, None)
|
||||
@ -96,18 +98,21 @@ class SystemTest(unittest.TestCase):
|
||||
|
||||
def test_connections(self):
|
||||
d = self.set_up_nodes()
|
||||
self.extra_node = None
|
||||
d.addCallback(lambda res: self.add_extra_node(5))
|
||||
def _check(extra_node):
|
||||
self.extra_node = extra_node
|
||||
for c in self.clients:
|
||||
self.failUnlessEqual(len(c.connections), 6)
|
||||
self.failUnlessEqual(len(c.get_all_peerids()), 6)
|
||||
d.addCallback(_check)
|
||||
def _shutdown_extra_node(res):
|
||||
d1 = self.extra_node.stopService()
|
||||
d2 = defer.Deferred()
|
||||
reactor.callLater(self.DISCONNECT_DELAY, d2.callback, res)
|
||||
d1.addCallback(lambda ignored: d2)
|
||||
return d1
|
||||
if self.extra_node:
|
||||
d1 = self.extra_node.stopService()
|
||||
d2 = defer.Deferred()
|
||||
reactor.callLater(self.DISCONNECT_DELAY, d2.callback, res)
|
||||
d1.addCallback(lambda ignored: d2)
|
||||
return d1
|
||||
return res
|
||||
d.addBoth(_shutdown_extra_node)
|
||||
return d
|
||||
|
||||
|
@ -23,13 +23,9 @@
|
||||
<table n:render="sequence" n:data="peers" border="1">
|
||||
<tr n:pattern="header">
|
||||
<td>PeerID</td>
|
||||
<td>Connected?</td>
|
||||
<td>PBURL</td>
|
||||
</tr>
|
||||
<tr n:pattern="item" n:render="row">
|
||||
<td><tt><n:slot name="peerid"/></tt></td>
|
||||
<td><n:slot name="connected"/></td>
|
||||
<td><n:slot name="pburl"/></td>
|
||||
</tr>
|
||||
<tr n:pattern="empty"><td>no peers!</td></tr>
|
||||
</table>
|
||||
|
@ -28,31 +28,29 @@ class Welcome(rend.Page):
|
||||
docFactory = getxmlfile("welcome.xhtml")
|
||||
|
||||
def data_queen_pburl(self, ctx, data):
|
||||
return IClient(ctx).queen_pburl
|
||||
return IClient(ctx).introducer_furl
|
||||
def data_connected_to_queen(self, ctx, data):
|
||||
if IClient(ctx).queen:
|
||||
if IClient(ctx).connected_to_vdrive:
|
||||
return "yes"
|
||||
return "no"
|
||||
def data_num_peers(self, ctx, data):
|
||||
#client = inevow.ISite(ctx)._client
|
||||
client = IClient(ctx)
|
||||
return len(client.connections)
|
||||
return len(client.get_all_peerids())
|
||||
def data_num_connected_peers(self, ctx, data):
|
||||
return len(IClient(ctx).connections)
|
||||
return len(IClient(ctx).get_all_peerids())
|
||||
|
||||
def data_peers(self, ctx, data):
|
||||
d = []
|
||||
client = IClient(ctx)
|
||||
for nodeid in sorted(client.connections.keys()):
|
||||
row = (idlib.b2a(nodeid), "yes", "?")
|
||||
for nodeid in sorted(client.get_all_peerids()):
|
||||
row = (idlib.b2a(nodeid),)
|
||||
d.append(row)
|
||||
return d
|
||||
|
||||
def render_row(self, ctx, data):
|
||||
nodeid_a, connected, pburl = data
|
||||
(nodeid_a,) = data
|
||||
ctx.fillSlots("peerid", nodeid_a)
|
||||
ctx.fillSlots("connected", connected)
|
||||
ctx.fillSlots("pburl", pburl)
|
||||
return ctx.tag
|
||||
|
||||
# this is a form where users can download files by URI
|
||||
|
Loading…
x
Reference in New Issue
Block a user