mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-08 03:14:21 +00:00
Merge pr281: load YAML cache when connection fails
refs ticket:2788
This commit is contained in:
commit
73b08d2a54
@ -13,6 +13,9 @@ from allmydata.util import log
|
||||
from allmydata.util.rrefutil import add_version_to_remote_reference
|
||||
from allmydata.util.keyutil import BadSignatureError
|
||||
|
||||
class InvalidCacheError(Exception):
|
||||
pass
|
||||
|
||||
class WrapV2ClientInV1Interface(Referenceable): # for_v1
|
||||
"""I wrap a v2 IntroducerClient to make it look like a v1 client, so it
|
||||
can be attached to an old server."""
|
||||
@ -111,9 +114,34 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
def connect_failed(failure):
|
||||
self.log("Initial Introducer connection failed: perhaps it's down",
|
||||
level=log.WEIRD, failure=failure, umid="c5MqUQ")
|
||||
self._load_announcements()
|
||||
d = self._tub.getReference(self.introducer_furl)
|
||||
d.addErrback(connect_failed)
|
||||
|
||||
def _load_announcements(self):
|
||||
# Announcements contain unicode, because they come from JSON. We tell
|
||||
# PyYAML to give us unicode instead of str/bytes.
|
||||
def construct_unicode(loader, node):
|
||||
return node.value
|
||||
yaml.SafeLoader.add_constructor("tag:yaml.org,2002:str",
|
||||
construct_unicode)
|
||||
try:
|
||||
with self._cache_filepath.open() as f:
|
||||
servers = yaml.safe_load(f)
|
||||
except EnvironmentError:
|
||||
return # no cache file
|
||||
if not isinstance(servers, list):
|
||||
log.err(InvalidCacheError("not a list"), level=log.WEIRD)
|
||||
return
|
||||
self.log("Using server data from cache", level=log.UNUSUAL)
|
||||
for server_params in servers:
|
||||
if not isinstance(server_params, dict):
|
||||
log.err(InvalidCacheError("not a dict: %r" % (server_params,)),
|
||||
level=log.WEIRD)
|
||||
continue
|
||||
self._deliver_announcements(server_params['key_s'],
|
||||
server_params['ann'])
|
||||
|
||||
def _save_announcements(self):
|
||||
announcements = []
|
||||
for _, value in self._inbound_announcements.items():
|
||||
@ -123,7 +151,7 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
"key_s" : key_s,
|
||||
}
|
||||
announcements.append(server_params)
|
||||
announcement_cache_yaml = yaml.dump(announcements)
|
||||
announcement_cache_yaml = yaml.safe_dump(announcements)
|
||||
self._cache_filepath.setContent(announcement_cache_yaml)
|
||||
|
||||
def _got_introducer(self, publisher):
|
||||
@ -359,6 +387,10 @@ class IntroducerClient(service.Service, Referenceable):
|
||||
self._save_announcements()
|
||||
# note: we never forget an index, but we might update its value
|
||||
|
||||
self._deliver_announcements(key_s, ann)
|
||||
|
||||
def _deliver_announcements(self, key_s, ann):
|
||||
service_name = str(ann["service-name"])
|
||||
for (service_name2,cb,args,kwargs) in self._local_subscribers:
|
||||
if service_name2 == service_name:
|
||||
eventually(cb, key_s, ann, *args, **kwargs)
|
||||
|
@ -1008,7 +1008,16 @@ class Announcements(unittest.TestCase):
|
||||
self.failUnlessEqual(a[0].version, "my_version")
|
||||
self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
|
||||
|
||||
def test_client_cache_1(self):
|
||||
def _load_cache(self, cache_filepath):
|
||||
def construct_unicode(loader, node):
|
||||
return node.value
|
||||
yaml.SafeLoader.add_constructor("tag:yaml.org,2002:str",
|
||||
construct_unicode)
|
||||
with cache_filepath.open() as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_client_cache(self):
|
||||
basedir = "introducer/ClientSeqnums/test_client_cache_1"
|
||||
fileutil.make_dirs(basedir)
|
||||
cache_filepath = FilePath(os.path.join(basedir, "private",
|
||||
@ -1029,23 +1038,77 @@ class Announcements(unittest.TestCase):
|
||||
ic = c.introducer_client
|
||||
sk_s, vk_s = keyutil.make_keypair()
|
||||
sk, _ignored = keyutil.parse_privkey(sk_s)
|
||||
keyutil.remove_prefix(vk_s, "pub-v0-")
|
||||
pub1 = keyutil.remove_prefix(vk_s, "pub-")
|
||||
furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
|
||||
ann_t = make_ann_t(ic, furl1, sk, 1)
|
||||
|
||||
ic.got_announcements([ann_t])
|
||||
|
||||
# check the cache for the announcement
|
||||
with cache_filepath.open() as f:
|
||||
def constructor(loader, node):
|
||||
return node.value
|
||||
yaml.SafeLoader.add_constructor("tag:yaml.org,2002:python/unicode", constructor)
|
||||
announcements = yaml.safe_load(f)
|
||||
f.close()
|
||||
|
||||
announcements = self._load_cache(cache_filepath)
|
||||
self.failUnlessEqual(len(announcements), 1)
|
||||
self.failUnlessEqual("pub-" + announcements[0]['key_s'], vk_s)
|
||||
self.failUnlessEqual(announcements[0]['key_s'], pub1)
|
||||
ann = announcements[0]["ann"]
|
||||
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
|
||||
self.failUnlessEqual(ann["seqnum"], 1)
|
||||
|
||||
# a new announcement that replaces the first should replace the
|
||||
# cached entry, not duplicate it
|
||||
furl2 = furl1 + "er"
|
||||
ann_t2 = make_ann_t(ic, furl2, sk, 2)
|
||||
ic.got_announcements([ann_t2])
|
||||
announcements = self._load_cache(cache_filepath)
|
||||
self.failUnlessEqual(len(announcements), 1)
|
||||
self.failUnlessEqual(announcements[0]['key_s'], pub1)
|
||||
ann = announcements[0]["ann"]
|
||||
self.failUnlessEqual(ann["anonymous-storage-FURL"], furl2)
|
||||
self.failUnlessEqual(ann["seqnum"], 2)
|
||||
|
||||
# but a third announcement with a different key should add to the
|
||||
# cache
|
||||
sk_s2, vk_s2 = keyutil.make_keypair()
|
||||
sk2, _ignored = keyutil.parse_privkey(sk_s2)
|
||||
pub2 = keyutil.remove_prefix(vk_s2, "pub-")
|
||||
furl3 = "pb://onug64tu@127.0.0.1:456/short"
|
||||
ann_t3 = make_ann_t(ic, furl3, sk2, 1)
|
||||
ic.got_announcements([ann_t3])
|
||||
|
||||
announcements = self._load_cache(cache_filepath)
|
||||
self.failUnlessEqual(len(announcements), 2)
|
||||
self.failUnlessEqual(set([pub1, pub2]),
|
||||
set([a["key_s"] for a in announcements]))
|
||||
self.failUnlessEqual(set([furl2, furl3]),
|
||||
set([a["ann"]["anonymous-storage-FURL"]
|
||||
for a in announcements]))
|
||||
|
||||
# test loading
|
||||
ic2 = IntroducerClient(None, "introducer.furl", u"my_nickname",
|
||||
"my_version", "oldest_version", {}, fakeseq,
|
||||
ic._cache_filepath)
|
||||
announcements = {}
|
||||
def got(key_s, ann):
|
||||
announcements[key_s] = ann
|
||||
ic2.subscribe_to("storage", got)
|
||||
ic2._load_announcements() # normally happens when connection fails
|
||||
yield flushEventualQueue()
|
||||
|
||||
self.failUnless(pub1 in announcements)
|
||||
self.failUnlessEqual(announcements[pub1]["anonymous-storage-FURL"],
|
||||
furl2)
|
||||
self.failUnlessEqual(announcements[pub2]["anonymous-storage-FURL"],
|
||||
furl3)
|
||||
|
||||
class YAMLUnicode(unittest.TestCase):
|
||||
def test_convert(self):
|
||||
data = yaml.safe_dump(["str", u"unicode", u"\u1234nicode"])
|
||||
def construct_unicode(loader, node):
|
||||
return node.value
|
||||
yaml.SafeLoader.add_constructor("tag:yaml.org,2002:str",
|
||||
construct_unicode)
|
||||
back = yaml.safe_load(data)
|
||||
self.failUnlessEqual(type(back[0]), unicode)
|
||||
self.failUnlessEqual(type(back[1]), unicode)
|
||||
self.failUnlessEqual(type(back[2]), unicode)
|
||||
|
||||
class ClientSeqnums(unittest.TestCase):
|
||||
def test_client(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user