mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-21 20:08:15 +00:00
incomplete work to be finished elsewhere
This commit is contained in:
parent
1264d77fd5
commit
e48a8b8e66
@ -29,7 +29,6 @@ class Client(node.Node, Referenceable):
|
||||
def __init__(self, basedir="."):
|
||||
node.Node.__init__(self, basedir)
|
||||
self.queen = None # self.queen is either None or a RemoteReference
|
||||
self.my_pburl = None
|
||||
self.all_peers = set()
|
||||
self.peer_pburls = {}
|
||||
self.connections = {}
|
||||
@ -53,7 +52,6 @@ class Client(node.Node, Referenceable):
|
||||
|
||||
def tub_ready(self):
|
||||
self.my_pburl = self.tub.registerReference(self)
|
||||
self.register_control()
|
||||
self.maybe_connect_to_queen()
|
||||
|
||||
def set_queen_pburl(self, queen_pburl):
|
||||
@ -73,29 +71,13 @@ class Client(node.Node, Referenceable):
|
||||
self.queen_connector = self.tub.connectTo(self.queen_pburl,
|
||||
self._got_queen)
|
||||
|
||||
def register_control(self):
|
||||
c = ControlServer()
|
||||
c.setServiceParent(self)
|
||||
control_url = self.tub.registerReference(c)
|
||||
f = open("control.pburl", "w")
|
||||
f.write(control_url + "\n")
|
||||
f.close()
|
||||
os.chmod("control.pburl", 0600)
|
||||
|
||||
def stopService(self):
|
||||
if self.queen_connector:
|
||||
self.queen_connector.stopConnecting()
|
||||
self.queen_connector = None
|
||||
if self.introducer_client:
|
||||
self.introducer_client.stop()
|
||||
return service.MultiService.stopService(self)
|
||||
|
||||
def _got_queen(self, queen):
|
||||
self.log("connected to queen")
|
||||
self.queen = queen
|
||||
queen.notifyOnDisconnect(self._lost_queen)
|
||||
d = queen.callRemote("hello",
|
||||
nodeid=self.nodeid,
|
||||
node=self,
|
||||
pburl=self.my_pburl)
|
||||
d.addCallback(lambda x: queen.callRemote("get_global_vdrive"))
|
||||
d.addCallback(self._got_vdrive_root)
|
||||
|
||||
@ -104,44 +86,10 @@ class Client(node.Node, Referenceable):
|
||||
if "webish" in self.namedServices:
|
||||
self.getServiceNamed("webish").set_root_dirnode(root)
|
||||
|
||||
def _lost_queen(self):
|
||||
self.log("lost connection to queen")
|
||||
self.queen = None
|
||||
|
||||
def remote_get_service(self, name):
|
||||
# TODO: 'vdrive' should not be public in the medium term
|
||||
return self.getServiceNamed(name)
|
||||
|
||||
def remote_add_peers(self, new_peers):
|
||||
for nodeid, pburl in new_peers:
|
||||
self.log("adding peer %s" % idlib.b2a(nodeid))
|
||||
if nodeid in self.all_peers:
|
||||
self.log("weird, I already had an entry for them")
|
||||
return
|
||||
self.all_peers.add(nodeid)
|
||||
self.peer_pburls[nodeid] = pburl
|
||||
if nodeid not in self.connections:
|
||||
d = self.tub.getReference(pburl)
|
||||
def _got_reference(ref, which_nodeid):
|
||||
self.log("connected to %s" % idlib.b2a(which_nodeid))
|
||||
if which_nodeid in self.all_peers:
|
||||
self.connections[which_nodeid] = ref
|
||||
else:
|
||||
log.msg(" ignoring it because we no longer want to talk to them")
|
||||
d.addCallback(_got_reference, nodeid)
|
||||
|
||||
def remote_lost_peers(self, lost_peers):
|
||||
for nodeid in lost_peers:
|
||||
self.log("lost peer %s" % idlib.b2a(nodeid))
|
||||
if nodeid in self.all_peers:
|
||||
self.all_peers.remove(nodeid)
|
||||
else:
|
||||
self.log("weird, I didn't have an entry for them")
|
||||
if nodeid in self.peer_pburls:
|
||||
del self.peer_pburls[nodeid]
|
||||
if nodeid in self.connections:
|
||||
del self.connections[nodeid]
|
||||
|
||||
def get_remote_service(self, nodeid, servicename):
|
||||
if nodeid not in self.connections:
|
||||
return defer.fail(IndexError("no connection to that peer"))
|
||||
|
@ -1,6 +1,6 @@
|
||||
|
||||
from zope.interface import Interface
|
||||
from foolscap.schema import StringConstraint, ListOf, TupleOf, Any, DictOf
|
||||
from foolscap.schema import StringConstraint, ListOf, TupleOf, Any
|
||||
from foolscap import RemoteInterface
|
||||
|
||||
Nodeid = StringConstraint(20) # binary format 20-byte SHA1 hash
|
||||
@ -8,7 +8,7 @@ PBURL = StringConstraint(150)
|
||||
Verifierid = StringConstraint(20)
|
||||
URI = StringConstraint(100) # kind of arbitrary
|
||||
ShareData = StringConstraint(100000)
|
||||
# these four are here because Foolscap does not yet support the kind of
|
||||
# these six are here because Foolscap does not yet support the kind of
|
||||
# restriction I really want to apply to these.
|
||||
RIClient_ = Any()
|
||||
Referenceable_ = Any()
|
||||
@ -17,21 +17,23 @@ RIBucketReader_ = Any()
|
||||
RIMutableDirectoryNode_ = Any()
|
||||
RIMutableFileNode_ = Any()
|
||||
|
||||
class RIQueenRoster(RemoteInterface):
|
||||
def hello(nodeid=Nodeid, node=RIClient_, pburl=PBURL):
|
||||
class RIIntroducerClient(RemoteInterface):
|
||||
def new_peers(pburls=SetOf(PBURL)):
|
||||
return None
|
||||
|
||||
class RIIntroducer(RemoteInterface):
|
||||
def hello(node=RIIntroducerClient, pburl=PBURL):
|
||||
return None
|
||||
|
||||
class RIQueenRoster(RemoteInterface):
|
||||
def get_global_vdrive():
|
||||
return RIMutableDirectoryNode_ # the virtual drive root
|
||||
|
||||
|
||||
class RIClient(RemoteInterface):
|
||||
class RIClient(RIIntroducerClient):
|
||||
def get_service(name=str):
|
||||
return Referenceable_
|
||||
def add_peers(new_peers=ListOf(TupleOf(Nodeid, PBURL), maxLength=100)):
|
||||
return None
|
||||
def lost_peers(lost_peers=ListOf(Nodeid)):
|
||||
return None
|
||||
def get_nodeid():
|
||||
return Nodeid
|
||||
|
||||
class RIStorageServer(RemoteInterface):
|
||||
def allocate_bucket(verifierid=Verifierid, bucket_num=int, size=int,
|
||||
|
34
src/allmydata/introducer.py
Normal file
34
src/allmydata/introducer.py
Normal file
@ -0,0 +1,34 @@
|
||||
from foolscap import Referenceable, DeadReferenceError
|
||||
from twisted.application import service
|
||||
from twisted.python import log
|
||||
from twisted.internet.error import ConnectionLost, ConnectionDone
|
||||
from zope.interface import implements
|
||||
from allmydata.interfaces import RIIntroducer
|
||||
|
||||
|
||||
def sendOnly(call, methname, *args, **kwargs):
|
||||
d = call(methname, *args, **kwargs)
|
||||
def _trap(f):
|
||||
f.trap(DeadReferenceError, ConnectionLost, ConnectionDone)
|
||||
d.addErrback(_trap)
|
||||
|
||||
class Introducer(service.MultiService, Referenceable):
|
||||
implements(RIIntroducer)
|
||||
|
||||
def __init__(self):
|
||||
service.MultiService.__init__(self)
|
||||
self.nodes = set()
|
||||
self.pburls = set()
|
||||
|
||||
def remote_hello(self, node, pburl):
|
||||
log.msg("roster: new contact at %s, node is %s" % (pburl, node))
|
||||
def _remove():
|
||||
log.msg(" roster: removing %s %s" % (node, pburl))
|
||||
self.nodes.remove(node)
|
||||
self.pburls.remove(pburl)
|
||||
node.notifyOnDisconnect(_remove)
|
||||
self.pburls.add(pburl)
|
||||
node.callRemote("new_peers", self.pburls)
|
||||
for othernode in self.nodes:
|
||||
othernode.callRemote("new_peers", set([pburl]))
|
||||
self.nodes.add(node)
|
45
src/allmydata/introducerclient.py
Normal file
45
src/allmydata/introducerclient.py
Normal file
@ -0,0 +1,45 @@
|
||||
class IntroducerClient(Referenceable):
|
||||
implements(RIIntroducerClient)
|
||||
|
||||
def __init__(self, tub, introducer_pburl, my_pburl):
|
||||
self.introducer_reconnector = self.tub.connectTo(introducer_pburl,
|
||||
self._got_introducer)
|
||||
|
||||
self.tub = tub
|
||||
self.my_pburl = my_pburl
|
||||
|
||||
self.connections = {} # k: nodeid, v: ref
|
||||
self.reconnectors = {} # k: PBURL, v: reconnector
|
||||
|
||||
def remote_get_nodeid(self):
|
||||
return self.nodeid
|
||||
|
||||
def remote_new_peers(self, pburls):
|
||||
for pburl in pburls:
|
||||
self._new_peer(pburl)
|
||||
|
||||
def stop(self):
|
||||
self.introducer_reconnector.stopConnecting()
|
||||
for reconnector in self.reconnectors.itervalues():
|
||||
reconnector.stopConnecting()
|
||||
|
||||
def _new_peer(self, pburl):
|
||||
if pburl in self.reconnectors:
|
||||
return
|
||||
def _got_peer(rref):
|
||||
d2 = rref.callRemote("get_nodeid")
|
||||
def _got_nodeid(nodeid):
|
||||
self.connections[nodeid] = rref
|
||||
def _lost():
|
||||
# TODO: notifyOnDisconnect uses eventually(), but connects do not. Could this cause a problem?
|
||||
del self.connections[nodeid]
|
||||
rref.notifyOnDisconnect(_lost)
|
||||
d2.addCallback(_got_nodeid)
|
||||
log.msg(" connecting to(%s)" % pburl)
|
||||
self.reconnectors[pburl] = self.tub.connectTo(pburl, _got_peer)
|
||||
|
||||
def _got_introducer(self, introducer):
|
||||
log.msg(" introducing ourselves: %s, %s" % (self, self.my_pburl))
|
||||
d = introducer.callRemote("hello",
|
||||
node=self,
|
||||
pburl=self.my_pburl)
|
@ -7,7 +7,7 @@ from twisted.python import log
|
||||
from twisted.internet.error import ConnectionLost, ConnectionDone
|
||||
from allmydata.util import idlib
|
||||
from zope.interface import implements
|
||||
from allmydata.interfaces import RIQueenRoster
|
||||
from allmydata.interfaces import RIQueenRoster, RIIntroducer
|
||||
from allmydata import node
|
||||
from allmydata.filetable import GlobalVirtualDrive
|
||||
|
||||
@ -22,51 +22,14 @@ class Roster(service.MultiService, Referenceable):
|
||||
implements(RIQueenRoster)
|
||||
|
||||
def __init__(self):
|
||||
service.MultiService.__init__(self)
|
||||
self.phonebook = {}
|
||||
self.connections = {}
|
||||
self.gvd_root = None
|
||||
|
||||
def set_gvd_root(self, root):
|
||||
self.gvd_root = root
|
||||
|
||||
def remote_hello(self, nodeid, node, pburl):
|
||||
log.msg("roster: contact from %s" % idlib.b2a(nodeid))
|
||||
self.phonebook[nodeid] = pburl
|
||||
self.connections[nodeid] = node
|
||||
eventually(self._educate_the_new_peer,
|
||||
nodeid, node, list(self.phonebook.items()))
|
||||
eventually(self._announce_new_peer,
|
||||
nodeid, pburl, list(self.connections.values()))
|
||||
node.notifyOnDisconnect(self._lost_node, nodeid)
|
||||
|
||||
def remote_get_global_vdrive(self):
|
||||
return self.gvd_root
|
||||
|
||||
def _educate_the_new_peer(self, nodeid, node, new_peers):
|
||||
log.msg("roster: educating %s (%d)" % (idlib.b2a(nodeid)[:4], len(new_peers)))
|
||||
node.callRemote("add_peers", new_peers=new_peers)
|
||||
|
||||
def _announce_new_peer(self, new_nodeid, new_node_pburl, peers):
|
||||
log.msg("roster: announcing %s to everybody (%d)" % (idlib.b2a(new_nodeid)[:4], len(peers)))
|
||||
for targetnode in peers:
|
||||
targetnode.callRemote("add_peers",
|
||||
new_peers=[(new_nodeid, new_node_pburl)])
|
||||
|
||||
def _lost_node(self, nodeid):
|
||||
log.msg("roster: lost contact with %s" % idlib.b2a(nodeid))
|
||||
del self.phonebook[nodeid]
|
||||
del self.connections[nodeid]
|
||||
eventually(self._announce_lost_peer, nodeid)
|
||||
|
||||
def _announce_lost_peer(self, lost_nodeid):
|
||||
for targetnode in self.connections.values():
|
||||
# use sendOnly, because if they go away then we assume it's
|
||||
# because they crashed and they've lost all their peer
|
||||
# connections anyways.
|
||||
sendOnly(targetnode.callRemote, "lost_peers",
|
||||
lost_peers=[lost_nodeid])
|
||||
|
||||
|
||||
|
||||
class Queen(node.Node):
|
||||
|
@ -12,14 +12,16 @@ class Basic(unittest.TestCase):
|
||||
|
||||
def test_permute(self):
|
||||
c = client.Client("")
|
||||
c.all_peers = ["%d" % i for i in range(5)]
|
||||
for k in ["%d" % i for i in range(5)]:
|
||||
c.connections[k] = None
|
||||
self.failUnlessEqual(c.permute_peerids("one"), ['3','1','0','4','2'])
|
||||
self.failUnlessEqual(c.permute_peerids("one", 3), ['3','1','0'])
|
||||
self.failUnlessEqual(c.permute_peerids("two"), ['0','4','2','1','3'])
|
||||
c.all_peers = []
|
||||
c.connections.clear()
|
||||
self.failUnlessEqual(c.permute_peerids("one"), [])
|
||||
|
||||
c2 = client.Client("")
|
||||
c2.all_peers = ["%d" % i for i in range(5)]
|
||||
for k in ["%d" % i for i in range(5)]:
|
||||
c2.connections[k] = None
|
||||
self.failUnlessEqual(c2.permute_peerids("one"), ['3','1','0','4','2'])
|
||||
|
||||
|
@ -36,20 +36,15 @@ class Welcome(rend.Page):
|
||||
def data_num_peers(self, ctx, data):
|
||||
#client = inevow.ISite(ctx)._client
|
||||
client = IClient(ctx)
|
||||
return len(client.all_peers)
|
||||
return len(client.connections)
|
||||
def data_num_connected_peers(self, ctx, data):
|
||||
return len(IClient(ctx).connections)
|
||||
|
||||
def data_peers(self, ctx, data):
|
||||
d = []
|
||||
client = IClient(ctx)
|
||||
for nodeid in sorted(client.all_peers):
|
||||
if nodeid in client.connections:
|
||||
connected = "yes"
|
||||
else:
|
||||
connected = "no"
|
||||
pburl = client.peer_pburls[nodeid]
|
||||
row = (idlib.b2a(nodeid), connected, pburl)
|
||||
for nodeid in sorted(client.connections.keys()):
|
||||
row = (idlib.b2a(nodeid), "yes", "?")
|
||||
d.append(row)
|
||||
return d
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user