2007-03-27 23:12:11 +00:00
|
|
|
|
|
|
|
import re
|
2007-12-03 21:52:42 +00:00
|
|
|
from base64 import b32encode, b32decode
|
2007-03-27 23:12:11 +00:00
|
|
|
from zope.interface import implements
|
2007-03-23 23:15:57 +00:00
|
|
|
from twisted.application import service
|
|
|
|
from twisted.python import log
|
2007-03-27 23:12:11 +00:00
|
|
|
from foolscap import Referenceable
|
2007-12-03 21:52:42 +00:00
|
|
|
from allmydata import node
|
2007-03-27 23:12:11 +00:00
|
|
|
from allmydata.interfaces import RIIntroducer, RIIntroducerClient
|
2007-07-25 01:13:58 +00:00
|
|
|
from allmydata.util import observer
|
2007-03-23 23:15:57 +00:00
|
|
|
|
2007-12-03 21:52:42 +00:00
|
|
|
class IntroducerNode(node.Node):
|
|
|
|
PORTNUMFILE = "introducer.port"
|
|
|
|
NODETYPE = "introducer"
|
|
|
|
ENCODING_PARAMETERS_FILE = "encoding_parameters"
|
|
|
|
DEFAULT_K, DEFAULT_DESIRED, DEFAULT_N = 3, 7, 10
|
|
|
|
|
|
|
|
def tub_ready(self):
|
|
|
|
k, desired, n = self.DEFAULT_K, self.DEFAULT_DESIRED, self.DEFAULT_N
|
|
|
|
data = self.get_config("encoding_parameters")
|
|
|
|
if data is not None:
|
|
|
|
k,desired,n = data.split()
|
|
|
|
k = int(k); desired = int(desired); n = int(n)
|
|
|
|
introducerservice = IntroducerService(self.basedir, (k, desired, n))
|
|
|
|
self.add_service(introducerservice)
|
|
|
|
self.introducer_url = self.tub.registerReference(introducerservice, "introducer")
|
|
|
|
self.log(" introducer is at %s" % self.introducer_url)
|
|
|
|
self.write_config("introducer.furl", self.introducer_url + "\n")
|
|
|
|
|
|
|
|
class IntroducerService(service.MultiService, Referenceable):
|
2007-03-23 23:15:57 +00:00
|
|
|
implements(RIIntroducer)
|
2007-07-12 22:33:30 +00:00
|
|
|
name = "introducer"
|
2007-03-23 23:15:57 +00:00
|
|
|
|
2007-12-03 21:52:42 +00:00
|
|
|
def __init__(self, basedir=".", encoding_parameters=None):
|
2007-03-23 23:15:57 +00:00
|
|
|
service.MultiService.__init__(self)
|
2007-12-03 21:52:42 +00:00
|
|
|
self.introducer_url = None
|
2007-03-23 23:15:57 +00:00
|
|
|
self.nodes = set()
|
2007-05-22 21:08:30 +00:00
|
|
|
self.furls = set()
|
2007-12-03 21:52:42 +00:00
|
|
|
self._encoding_parameters = encoding_parameters
|
2007-03-23 23:15:57 +00:00
|
|
|
|
2007-05-22 21:08:30 +00:00
|
|
|
def remote_hello(self, node, furl):
|
|
|
|
log.msg("introducer: new contact at %s, node is %s" % (furl, node))
|
2007-03-23 23:15:57 +00:00
|
|
|
def _remove():
|
2007-05-22 21:08:30 +00:00
|
|
|
log.msg(" introducer: removing %s %s" % (node, furl))
|
2007-03-23 23:15:57 +00:00
|
|
|
self.nodes.remove(node)
|
2007-05-22 21:08:30 +00:00
|
|
|
self.furls.remove(furl)
|
2007-03-23 23:15:57 +00:00
|
|
|
node.notifyOnDisconnect(_remove)
|
2007-05-22 21:08:30 +00:00
|
|
|
self.furls.add(furl)
|
|
|
|
node.callRemote("new_peers", self.furls)
|
2007-07-12 22:33:30 +00:00
|
|
|
if self._encoding_parameters is not None:
|
|
|
|
node.callRemote("set_encoding_parameters",
|
|
|
|
self._encoding_parameters)
|
2007-03-23 23:15:57 +00:00
|
|
|
for othernode in self.nodes:
|
2007-05-22 21:08:30 +00:00
|
|
|
othernode.callRemote("new_peers", set([furl]))
|
2007-03-23 23:15:57 +00:00
|
|
|
self.nodes.add(node)
|
2007-03-27 23:12:11 +00:00
|
|
|
|
|
|
|
class IntroducerClient(service.Service, Referenceable):
|
|
|
|
implements(RIIntroducerClient)
|
|
|
|
|
2007-05-22 21:08:30 +00:00
|
|
|
def __init__(self, tub, introducer_furl, my_furl):
|
2007-03-27 23:12:11 +00:00
|
|
|
self.tub = tub
|
2007-05-22 21:08:30 +00:00
|
|
|
self.introducer_furl = introducer_furl
|
|
|
|
self.my_furl = my_furl
|
2007-03-27 23:12:11 +00:00
|
|
|
|
|
|
|
self.connections = {} # k: nodeid, v: ref
|
2007-05-22 21:08:30 +00:00
|
|
|
self.reconnectors = {} # k: FURL, v: reconnector
|
2007-06-10 04:03:57 +00:00
|
|
|
self._connected = False
|
2007-03-27 23:12:11 +00:00
|
|
|
|
2007-05-08 02:10:24 +00:00
|
|
|
self.connection_observers = observer.ObserverList()
|
2007-07-12 22:33:30 +00:00
|
|
|
self.encoding_parameters = None
|
2007-03-27 23:12:11 +00:00
|
|
|
|
2007-12-03 21:52:42 +00:00
|
|
|
# The N'th element of _observers_of_enough_peers is None if nobody has
|
|
|
|
# asked to be informed when N peers become connected, it is a
|
|
|
|
# OneShotObserverList if someone has asked to be informed, and that
|
|
|
|
# list is fired when N peers next become connected (or immediately if
|
|
|
|
# N peers are already connected when someone asks), and the N'th
|
|
|
|
# element is replaced by None when the number of connected peers falls
|
|
|
|
# below N. _observers_of_enough_peers is always just long enough to
|
|
|
|
# hold the highest-numbered N that anyone is interested in (i.e.,
|
|
|
|
# there are never trailing Nones in _observers_of_enough_peers).
|
|
|
|
self._observers_of_enough_peers = []
|
|
|
|
|
2007-03-27 23:12:11 +00:00
|
|
|
def startService(self):
|
2007-06-05 01:45:40 +00:00
|
|
|
service.Service.startService(self)
|
2007-05-22 21:08:30 +00:00
|
|
|
self.introducer_reconnector = self.tub.connectTo(self.introducer_furl,
|
2007-03-27 23:12:11 +00:00
|
|
|
self._got_introducer)
|
2007-06-05 01:48:53 +00:00
|
|
|
def connect_failed(failure):
|
2007-08-09 19:53:44 +00:00
|
|
|
self.log("\n\nInitial Introducer connection failed: "
|
|
|
|
"perhaps it's down\n")
|
|
|
|
self.log(str(failure))
|
2007-06-05 01:48:53 +00:00
|
|
|
d = self.tub.getReference(self.introducer_furl)
|
|
|
|
d.addErrback(connect_failed)
|
2007-03-27 23:12:11 +00:00
|
|
|
|
|
|
|
def log(self, msg):
|
|
|
|
self.parent.log(msg)
|
|
|
|
|
2007-05-22 21:08:30 +00:00
|
|
|
def remote_new_peers(self, furls):
|
|
|
|
for furl in furls:
|
|
|
|
self._new_peer(furl)
|
2007-03-27 23:12:11 +00:00
|
|
|
|
2007-07-12 22:33:30 +00:00
|
|
|
def remote_set_encoding_parameters(self, parameters):
|
|
|
|
self.encoding_parameters = parameters
|
|
|
|
|
2007-03-27 23:12:11 +00:00
|
|
|
def stopService(self):
|
|
|
|
service.Service.stopService(self)
|
|
|
|
self.introducer_reconnector.stopConnecting()
|
|
|
|
for reconnector in self.reconnectors.itervalues():
|
|
|
|
reconnector.stopConnecting()
|
|
|
|
|
2007-05-22 21:08:30 +00:00
|
|
|
def _new_peer(self, furl):
|
|
|
|
if furl in self.reconnectors:
|
2007-03-27 23:12:11 +00:00
|
|
|
return
|
2007-03-29 18:16:29 +00:00
|
|
|
# TODO: rather than using the TubID as a nodeid, we should use
|
|
|
|
# something else. The thing that requires the least additional
|
|
|
|
# mappings is to use the foolscap "identifier" (the last component of
|
2007-05-22 21:08:30 +00:00
|
|
|
# the furl), since these are unguessable. Before we can do that,
|
2007-03-29 18:16:29 +00:00
|
|
|
# though, we need a way to conveniently make these identifiers
|
|
|
|
# persist from one run of the client program to the next. Also, using
|
|
|
|
# the foolscap identifier would mean that anyone who knows the name
|
|
|
|
# of the node also has all the secrets they need to contact and use
|
|
|
|
# them, which may or may not be what we want.
|
2007-05-22 21:08:30 +00:00
|
|
|
m = re.match(r'pb://(\w+)@', furl)
|
2007-03-27 23:12:11 +00:00
|
|
|
assert m
|
2007-07-24 20:46:06 +00:00
|
|
|
nodeid = b32decode(m.group(1).upper())
|
2007-03-27 23:12:11 +00:00
|
|
|
def _got_peer(rref):
|
2007-08-12 18:53:18 +00:00
|
|
|
self.log("connected to %s" % b32encode(nodeid).lower()[:8])
|
2007-05-08 02:10:24 +00:00
|
|
|
self.connection_observers.notify(nodeid, rref)
|
2007-03-27 23:12:11 +00:00
|
|
|
self.connections[nodeid] = rref
|
2007-12-03 21:52:42 +00:00
|
|
|
if len(self._observers_of_enough_peers) > len(self.connections):
|
|
|
|
osol = self._observers_of_enough_peers[len(self.connections)]
|
|
|
|
if osol:
|
|
|
|
osol.fire(None)
|
2007-03-27 23:12:11 +00:00
|
|
|
def _lost():
|
|
|
|
# TODO: notifyOnDisconnect uses eventually(), but connects do
|
|
|
|
# not. Could this cause a problem?
|
|
|
|
del self.connections[nodeid]
|
2007-12-03 21:52:42 +00:00
|
|
|
if len(self._observers_of_enough_peers) > len(self.connections):
|
|
|
|
self._observers_of_enough_peers[len(self.connections)] = None
|
|
|
|
while self._observers_of_enough_peers and (not self._observers_of_enough_peers[-1]):
|
|
|
|
self._observers_of_enough_peers.pop()
|
|
|
|
for numpeers in self._observers_of_enough_peers:
|
|
|
|
if len(self.connections) == (numpeers-1):
|
|
|
|
# We know that this observer list must have been
|
|
|
|
# fired, since we had enough peers before this one was
|
|
|
|
# lost.
|
|
|
|
del self._observers_of_enough_peers[numpeers]
|
|
|
|
|
2007-03-27 23:12:11 +00:00
|
|
|
rref.notifyOnDisconnect(_lost)
|
2007-08-12 18:53:18 +00:00
|
|
|
self.log("connecting to %s" % b32encode(nodeid).lower()[:8])
|
2007-05-22 21:08:30 +00:00
|
|
|
self.reconnectors[furl] = self.tub.connectTo(furl, _got_peer)
|
2007-03-27 23:12:11 +00:00
|
|
|
|
|
|
|
def _got_introducer(self, introducer):
|
2007-08-12 18:53:18 +00:00
|
|
|
self.log("introducing ourselves: %s, %s" % (self, self.my_furl[6:13]))
|
2007-06-10 04:03:57 +00:00
|
|
|
self._connected = True
|
2007-03-27 23:12:11 +00:00
|
|
|
d = introducer.callRemote("hello",
|
2007-08-10 01:30:24 +00:00
|
|
|
node=self,
|
|
|
|
furl=self.my_furl)
|
2007-06-10 04:03:57 +00:00
|
|
|
introducer.notifyOnDisconnect(self._disconnected)
|
|
|
|
|
|
|
|
def _disconnected(self):
|
2007-09-19 18:50:13 +00:00
|
|
|
self.log("bummer, we've lost our connection to the introducer")
|
2007-06-10 04:03:57 +00:00
|
|
|
self._connected = False
|
2007-03-27 23:12:11 +00:00
|
|
|
|
2007-05-08 02:10:24 +00:00
|
|
|
def notify_on_new_connection(self, cb):
|
|
|
|
"""Register a callback that will be fired (with nodeid, rref) when
|
|
|
|
a new connection is established."""
|
|
|
|
self.connection_observers.subscribe(cb)
|
2007-03-27 23:12:11 +00:00
|
|
|
|
2007-06-10 04:03:57 +00:00
|
|
|
def connected_to_introducer(self):
|
|
|
|
return self._connected
|
2007-07-17 02:47:42 +00:00
|
|
|
|
|
|
|
def get_all_peerids(self):
|
|
|
|
return self.connections.iterkeys()
|
|
|
|
|
|
|
|
def get_all_peers(self):
|
|
|
|
return self.connections.iteritems()
|
2007-12-03 21:52:42 +00:00
|
|
|
|
|
|
|
def when_enough_peers(self, numpeers):
|
|
|
|
"""
|
|
|
|
I return a deferred that fires the next time that at least numpeers
|
|
|
|
are connected, or fires immediately if numpeers are currently
|
|
|
|
available.
|
|
|
|
"""
|
|
|
|
self._observers_of_enough_peers.extend([None]*(numpeers+1-len(self._observers_of_enough_peers)))
|
|
|
|
if not self._observers_of_enough_peers[numpeers]:
|
|
|
|
self._observers_of_enough_peers[numpeers] = observer.OneShotObserverList()
|
|
|
|
if len(self.connections) >= numpeers:
|
|
|
|
self._observers_of_enough_peers[numpeers].fire(self)
|
|
|
|
return self._observers_of_enough_peers[numpeers].when_fired()
|