mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-03-23 04:05:15 +00:00
#538: fetch version and attach to the rref. Make IntroducerClient demand v1 support.
This commit is contained in:
parent
0eb6b324a4
commit
bf06492a90
@ -16,6 +16,7 @@ from allmydata import storage, hashtree, uri
|
||||
from allmydata.immutable import encode
|
||||
from allmydata.util import base32, idlib, mathutil
|
||||
from allmydata.util.assertutil import precondition
|
||||
from allmydata.util.rrefutil import get_versioned_remote_reference
|
||||
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
|
||||
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, NotEnoughSharesError
|
||||
from allmydata.immutable import layout
|
||||
@ -1216,8 +1217,18 @@ class Uploader(service.MultiService):
|
||||
self._got_helper)
|
||||
|
||||
def _got_helper(self, helper):
|
||||
log.msg("got helper connection, getting versions")
|
||||
default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
|
||||
{ },
|
||||
"application-version": "unknown: no get_version()",
|
||||
}
|
||||
d = get_versioned_remote_reference(helper, default)
|
||||
d.addCallback(self._got_versioned_helper)
|
||||
|
||||
def _got_versioned_helper(self, helper):
|
||||
self._helper = helper
|
||||
helper.notifyOnDisconnect(self._lost_helper)
|
||||
|
||||
def _lost_helper(self):
|
||||
self._helper = None
|
||||
|
||||
@ -1225,6 +1236,7 @@ class Uploader(service.MultiService):
|
||||
# return a tuple of (helper_furl_or_None, connected_bool)
|
||||
return (self._helper_furl, bool(self._helper))
|
||||
|
||||
|
||||
def upload(self, uploadable):
|
||||
# this returns the URI
|
||||
assert self.parent
|
||||
|
@ -2156,3 +2156,11 @@ class RIKeyGenerator(RemoteInterface):
|
||||
class FileTooLargeError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class InsufficientVersionError(Exception):
|
||||
def __init__(self, needed, got):
|
||||
self.needed = needed
|
||||
self.got = got
|
||||
def __repr__(self):
|
||||
return "InsufficientVersionError(need '%s', got %s)" % (self.needed,
|
||||
self.got)
|
||||
|
@ -4,9 +4,11 @@ from base64 import b32decode
|
||||
from zope.interface import implements
|
||||
from twisted.application import service
|
||||
from foolscap import Referenceable
|
||||
from allmydata.interfaces import InsufficientVersionError
|
||||
from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
|
||||
IIntroducerClient
|
||||
from allmydata.util import log, idlib
|
||||
from allmydata.util.rrefutil import get_versioned_remote_reference
|
||||
from allmydata.introducer.common import make_index
|
||||
|
||||
|
||||
@ -28,6 +30,14 @@ class RemoteServiceConnector:
|
||||
@ivar remote_host: the IAddress, if connected, otherwise None
|
||||
"""
|
||||
|
||||
VERSION_DEFAULTS = {
|
||||
"storage": { "http://allmydata.org/tahoe/protocols/storage/v1" :
|
||||
{ "maximum-immutable-share-size": 2**32 },
|
||||
"application-version": "unknown: no get_version()",
|
||||
},
|
||||
"stub_client": { },
|
||||
}
|
||||
|
||||
def __init__(self, announcement, tub, ic):
|
||||
self._tub = tub
|
||||
self._announcement = announcement
|
||||
@ -62,11 +72,19 @@ class RemoteServiceConnector:
|
||||
self._reconnector.stopConnecting()
|
||||
|
||||
def _got_service(self, rref):
|
||||
self.log("got connection to %s, getting versions" % self._nodeid_s)
|
||||
|
||||
default = self.VERSION_DEFAULTS.get(self.service_name, {})
|
||||
d = get_versioned_remote_reference(rref, default)
|
||||
d.addCallback(self._got_versioned_service)
|
||||
|
||||
def _got_versioned_service(self, rref):
|
||||
self.log("connected to %s, version %s" % (self._nodeid_s, rref.version))
|
||||
|
||||
self.last_connect_time = time.time()
|
||||
self.remote_host = rref.tracker.broker.transport.getPeer()
|
||||
self.remote_host = rref.rref.tracker.broker.transport.getPeer()
|
||||
|
||||
self.rref = rref
|
||||
self.log("connected to %s" % self._nodeid_s)
|
||||
|
||||
self._ic.add_connection(self._nodeid, self.service_name, rref)
|
||||
|
||||
@ -79,6 +97,7 @@ class RemoteServiceConnector:
|
||||
self.remote_host = None
|
||||
self._ic.remove_connection(self._nodeid, self.service_name, rref)
|
||||
|
||||
|
||||
def reset(self):
|
||||
self._reconnector.reset()
|
||||
|
||||
@ -119,6 +138,7 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
|
||||
def startService(self):
|
||||
service.Service.startService(self)
|
||||
self._introducer_error = None
|
||||
rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
|
||||
self._introducer_reconnector = rc
|
||||
def connect_failed(failure):
|
||||
@ -128,7 +148,25 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
d.addErrback(connect_failed)
|
||||
|
||||
def _got_introducer(self, publisher):
|
||||
self.log("connected to introducer")
|
||||
self.log("connected to introducer, getting versions")
|
||||
default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
|
||||
{ },
|
||||
"application-version": "unknown: no get_version()",
|
||||
}
|
||||
d = get_versioned_remote_reference(publisher, default)
|
||||
d.addCallback(self._got_versioned_introducer)
|
||||
d.addErrback(self._got_error)
|
||||
|
||||
def _got_error(self, f):
|
||||
# TODO: for the introducer, perhaps this should halt the application
|
||||
self._introducer_error = f # polled by tests
|
||||
|
||||
def _got_versioned_introducer(self, publisher):
|
||||
self.log("got introducer version: %s" % (publisher.version,))
|
||||
# we require a V1 introducer
|
||||
needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
|
||||
if needed not in publisher.version:
|
||||
raise InsufficientVersionError(needed, publisher.version)
|
||||
self._connected = True
|
||||
self._publisher = publisher
|
||||
publisher.notifyOnDisconnect(self._disconnected)
|
||||
@ -258,7 +296,7 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
if c[1] == service_name])
|
||||
|
||||
def get_permuted_peers(self, service_name, key):
|
||||
"""Return an ordered list of (peerid, rref) tuples."""
|
||||
"""Return an ordered list of (peerid, versioned-rref) tuples."""
|
||||
|
||||
results = []
|
||||
for (c_peerid, c_service_name, rref) in self._connections:
|
||||
|
@ -74,6 +74,15 @@ def flush_but_dont_ignore(res):
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
def wait_a_few_turns(ignored=None):
|
||||
d = eventual.fireEventually()
|
||||
d.addCallback(eventual.fireEventually)
|
||||
d.addCallback(eventual.fireEventually)
|
||||
d.addCallback(eventual.fireEventually)
|
||||
d.addCallback(eventual.fireEventually)
|
||||
d.addCallback(eventual.fireEventually)
|
||||
return d
|
||||
|
||||
def upload_data(uploader, data, convergence):
|
||||
u = upload.Data(data, convergence=convergence)
|
||||
return uploader.upload(u)
|
||||
@ -110,10 +119,7 @@ class AssistedUpload(unittest.TestCase):
|
||||
u = upload.Uploader(self.helper_furl)
|
||||
u.setServiceParent(self.s)
|
||||
|
||||
# wait a few turns
|
||||
d = eventual.fireEventually()
|
||||
d.addCallback(eventual.fireEventually)
|
||||
d.addCallback(eventual.fireEventually)
|
||||
d = wait_a_few_turns()
|
||||
|
||||
def _ready(res):
|
||||
assert u._helper
|
||||
@ -164,10 +170,7 @@ class AssistedUpload(unittest.TestCase):
|
||||
u = upload.Uploader(self.helper_furl)
|
||||
u.setServiceParent(self.s)
|
||||
|
||||
# wait a few turns
|
||||
d = eventual.fireEventually()
|
||||
d.addCallback(eventual.fireEventually)
|
||||
d.addCallback(eventual.fireEventually)
|
||||
d = wait_a_few_turns()
|
||||
|
||||
def _ready(res):
|
||||
assert u._helper
|
||||
@ -194,10 +197,7 @@ class AssistedUpload(unittest.TestCase):
|
||||
u = upload.Uploader(self.helper_furl)
|
||||
u.setServiceParent(self.s)
|
||||
|
||||
# wait a few turns
|
||||
d = eventual.fireEventually()
|
||||
d.addCallback(eventual.fireEventually)
|
||||
d.addCallback(eventual.fireEventually)
|
||||
d = wait_a_few_turns()
|
||||
|
||||
def _ready(res):
|
||||
assert u._helper
|
||||
|
@ -9,6 +9,7 @@ from twisted.python import log
|
||||
from foolscap import Tub, Referenceable
|
||||
from foolscap.eventual import fireEventually, flushEventualQueue
|
||||
from twisted.application import service
|
||||
from allmydata.interfaces import InsufficientVersionError
|
||||
from allmydata.introducer.client import IntroducerClient
|
||||
from allmydata.introducer.server import IntroducerService
|
||||
# test compatibility with old introducer .tac files
|
||||
@ -230,3 +231,44 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
|
||||
self.failIf(c.connected_to_introducer())
|
||||
d.addCallback(_check4)
|
||||
return d
|
||||
|
||||
class TooNewServer(IntroducerService):
|
||||
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
|
||||
{ },
|
||||
"application-version": "greetings from the crazy future",
|
||||
}
|
||||
|
||||
class NonV1Server(SystemTestMixin, unittest.TestCase):
|
||||
# if the 1.3.0 client connects to a server that doesn't provide the 'v1'
|
||||
# protocol, it is supposed to provide a useful error instead of a weird
|
||||
# exception.
|
||||
|
||||
def test_failure(self):
|
||||
i = TooNewServer()
|
||||
i.setServiceParent(self.parent)
|
||||
self.introducer_furl = self.central_tub.registerReference(i)
|
||||
|
||||
tub = Tub()
|
||||
tub.setServiceParent(self.parent)
|
||||
l = tub.listenOn("tcp:0")
|
||||
portnum = l.getPortnum()
|
||||
tub.setLocation("localhost:%d" % portnum)
|
||||
|
||||
n = FakeNode()
|
||||
c = IntroducerClient(tub, self.introducer_furl,
|
||||
"nickname-client", "version", "oldest")
|
||||
c.subscribe_to("storage")
|
||||
|
||||
c.setServiceParent(self.parent)
|
||||
|
||||
# now we wait for it to connect and notice the bad version
|
||||
|
||||
def _got_bad():
|
||||
return bool(c._introducer_error) or bool(c._publisher)
|
||||
d = self.poll(_got_bad)
|
||||
def _done(res):
|
||||
self.failUnless(c._introducer_error)
|
||||
self.failUnless(c._introducer_error.check(InsufficientVersionError))
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
|
32
src/allmydata/util/rrefutil.py
Normal file
32
src/allmydata/util/rrefutil.py
Normal file
@ -0,0 +1,32 @@
|
||||
|
||||
from foolscap.tokens import Violation
|
||||
|
||||
class VersionedRemoteReference:
|
||||
"""I wrap a RemoteReference, and add a .version attribute."""
|
||||
|
||||
def __init__(self, original, version):
|
||||
self.rref = original
|
||||
self.version = version
|
||||
|
||||
def callRemote(self, *args, **kwargs):
|
||||
return self.rref.callRemote(*args, **kwargs)
|
||||
|
||||
def callRemoteOnly(self, *args, **kwargs):
|
||||
return self.rref.callRemoteOnly(*args, **kwargs)
|
||||
|
||||
def notifyOnDisconnect(self, *args, **kwargs):
|
||||
return self.rref.notifyOnDisconnect(*args, **kwargs)
|
||||
|
||||
|
||||
def get_versioned_remote_reference(rref, default):
|
||||
"""I return a Deferred that fires with a VersionedRemoteReference"""
|
||||
d = rref.callRemote("get_version")
|
||||
def _no_get_version(f):
|
||||
f.trap(Violation, AttributeError)
|
||||
return default
|
||||
d.addErrback(_no_get_version)
|
||||
def _got_version(version):
|
||||
return VersionedRemoteReference(rref, version)
|
||||
d.addCallback(_got_version)
|
||||
return d
|
||||
|
Loading…
x
Reference in New Issue
Block a user