mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-27 14:30:19 +00:00
Merge pull request #936 from LeastAuthority/3534.test_introducer-no-mock
Remove mock from test_introducer Fixes: ticket:3534
This commit is contained in:
commit
7a504dd99f
0
newsfragments/3534.minor
Normal file
0
newsfragments/3534.minor
Normal file
@ -16,7 +16,7 @@ from six import ensure_text, ensure_str
|
|||||||
import time
|
import time
|
||||||
from zope.interface import implementer
|
from zope.interface import implementer
|
||||||
from twisted.application import service
|
from twisted.application import service
|
||||||
from foolscap.api import Referenceable, eventually
|
from foolscap.api import Referenceable
|
||||||
from allmydata.interfaces import InsufficientVersionError
|
from allmydata.interfaces import InsufficientVersionError
|
||||||
from allmydata.introducer.interfaces import IIntroducerClient, \
|
from allmydata.introducer.interfaces import IIntroducerClient, \
|
||||||
RIIntroducerSubscriberClient_v2
|
RIIntroducerSubscriberClient_v2
|
||||||
@ -24,6 +24,9 @@ from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
|
|||||||
get_tubid_string_from_ann
|
get_tubid_string_from_ann
|
||||||
from allmydata.util import log, yamlutil, connection_status
|
from allmydata.util import log, yamlutil, connection_status
|
||||||
from allmydata.util.rrefutil import add_version_to_remote_reference
|
from allmydata.util.rrefutil import add_version_to_remote_reference
|
||||||
|
from allmydata.util.observer import (
|
||||||
|
ObserverList,
|
||||||
|
)
|
||||||
from allmydata.crypto.error import BadSignature
|
from allmydata.crypto.error import BadSignature
|
||||||
from allmydata.util.assertutil import precondition
|
from allmydata.util.assertutil import precondition
|
||||||
|
|
||||||
@ -62,8 +65,7 @@ class IntroducerClient(service.Service, Referenceable):
|
|||||||
self._publisher = None
|
self._publisher = None
|
||||||
self._since = None
|
self._since = None
|
||||||
|
|
||||||
self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
|
self._local_subscribers = {} # {servicename: ObserverList}
|
||||||
self._subscribed_service_names = set()
|
|
||||||
self._subscriptions = set() # requests we've actually sent
|
self._subscriptions = set() # requests we've actually sent
|
||||||
|
|
||||||
# _inbound_announcements remembers one announcement per
|
# _inbound_announcements remembers one announcement per
|
||||||
@ -177,21 +179,21 @@ class IntroducerClient(service.Service, Referenceable):
|
|||||||
return log.msg(*args, **kwargs)
|
return log.msg(*args, **kwargs)
|
||||||
|
|
||||||
def subscribe_to(self, service_name, cb, *args, **kwargs):
|
def subscribe_to(self, service_name, cb, *args, **kwargs):
|
||||||
self._local_subscribers.append( (service_name,cb,args,kwargs) )
|
obs = self._local_subscribers.setdefault(service_name, ObserverList())
|
||||||
self._subscribed_service_names.add(service_name)
|
obs.subscribe(lambda key_s, ann: cb(key_s, ann, *args, **kwargs))
|
||||||
self._maybe_subscribe()
|
self._maybe_subscribe()
|
||||||
for index,(ann,key_s,when) in list(self._inbound_announcements.items()):
|
for index,(ann,key_s,when) in list(self._inbound_announcements.items()):
|
||||||
precondition(isinstance(key_s, bytes), key_s)
|
precondition(isinstance(key_s, bytes), key_s)
|
||||||
servicename = index[0]
|
servicename = index[0]
|
||||||
if servicename == service_name:
|
if servicename == service_name:
|
||||||
eventually(cb, key_s, ann, *args, **kwargs)
|
obs.notify(key_s, ann)
|
||||||
|
|
||||||
def _maybe_subscribe(self):
|
def _maybe_subscribe(self):
|
||||||
if not self._publisher:
|
if not self._publisher:
|
||||||
self.log("want to subscribe, but no introducer yet",
|
self.log("want to subscribe, but no introducer yet",
|
||||||
level=log.NOISY)
|
level=log.NOISY)
|
||||||
return
|
return
|
||||||
for service_name in self._subscribed_service_names:
|
for service_name in self._local_subscribers:
|
||||||
if service_name in self._subscriptions:
|
if service_name in self._subscriptions:
|
||||||
continue
|
continue
|
||||||
self._subscriptions.add(service_name)
|
self._subscriptions.add(service_name)
|
||||||
@ -270,7 +272,7 @@ class IntroducerClient(service.Service, Referenceable):
|
|||||||
precondition(isinstance(key_s, bytes), key_s)
|
precondition(isinstance(key_s, bytes), key_s)
|
||||||
self._debug_counts["inbound_announcement"] += 1
|
self._debug_counts["inbound_announcement"] += 1
|
||||||
service_name = str(ann["service-name"])
|
service_name = str(ann["service-name"])
|
||||||
if service_name not in self._subscribed_service_names:
|
if service_name not in self._local_subscribers:
|
||||||
self.log("announcement for a service we don't care about [%s]"
|
self.log("announcement for a service we don't care about [%s]"
|
||||||
% (service_name,), level=log.UNUSUAL, umid="dIpGNA")
|
% (service_name,), level=log.UNUSUAL, umid="dIpGNA")
|
||||||
self._debug_counts["wrong_service"] += 1
|
self._debug_counts["wrong_service"] += 1
|
||||||
@ -341,9 +343,9 @@ class IntroducerClient(service.Service, Referenceable):
|
|||||||
def _deliver_announcements(self, key_s, ann):
|
def _deliver_announcements(self, key_s, ann):
|
||||||
precondition(isinstance(key_s, bytes), key_s)
|
precondition(isinstance(key_s, bytes), key_s)
|
||||||
service_name = str(ann["service-name"])
|
service_name = str(ann["service-name"])
|
||||||
for (service_name2,cb,args,kwargs) in self._local_subscribers:
|
obs = self._local_subscribers.get(service_name)
|
||||||
if service_name2 == service_name:
|
if obs is not None:
|
||||||
eventually(cb, key_s, ann, *args, **kwargs)
|
obs.notify(key_s, ann)
|
||||||
|
|
||||||
def connection_status(self):
|
def connection_status(self):
|
||||||
assert self.running # startService builds _introducer_reconnector
|
assert self.running # startService builds _introducer_reconnector
|
||||||
|
@ -15,7 +15,12 @@ from six import ensure_binary, ensure_text
|
|||||||
import os, re, itertools
|
import os, re, itertools
|
||||||
from base64 import b32decode
|
from base64 import b32decode
|
||||||
import json
|
import json
|
||||||
from mock import Mock, patch
|
from operator import (
|
||||||
|
setitem,
|
||||||
|
)
|
||||||
|
from functools import (
|
||||||
|
partial,
|
||||||
|
)
|
||||||
|
|
||||||
from testtools.matchers import (
|
from testtools.matchers import (
|
||||||
Is,
|
Is,
|
||||||
@ -84,7 +89,8 @@ class Node(testutil.SignalMixin, testutil.ReallyEqualMixin, AsyncTestCase):
|
|||||||
|
|
||||||
def test_introducer_clients_unloadable(self):
|
def test_introducer_clients_unloadable(self):
|
||||||
"""
|
"""
|
||||||
Error if introducers.yaml exists but we can't read it
|
``create_introducer_clients`` raises ``EnvironmentError`` if
|
||||||
|
``introducers.yaml`` exists but we can't read it.
|
||||||
"""
|
"""
|
||||||
basedir = u"introducer.IntroducerNode.test_introducer_clients_unloadable"
|
basedir = u"introducer.IntroducerNode.test_introducer_clients_unloadable"
|
||||||
os.mkdir(basedir)
|
os.mkdir(basedir)
|
||||||
@ -94,17 +100,10 @@ class Node(testutil.SignalMixin, testutil.ReallyEqualMixin, AsyncTestCase):
|
|||||||
f.write(u'---\n')
|
f.write(u'---\n')
|
||||||
os.chmod(yaml_fname, 0o000)
|
os.chmod(yaml_fname, 0o000)
|
||||||
self.addCleanup(lambda: os.chmod(yaml_fname, 0o700))
|
self.addCleanup(lambda: os.chmod(yaml_fname, 0o700))
|
||||||
# just mocking the yaml failure, as "yamlutil.safe_load" only
|
|
||||||
# returns None on some platforms for unreadable files
|
|
||||||
|
|
||||||
with patch("allmydata.client.yamlutil") as p:
|
config = read_config(basedir, "portnum")
|
||||||
p.safe_load = Mock(return_value=None)
|
with self.assertRaises(EnvironmentError):
|
||||||
|
create_introducer_clients(config, Tub())
|
||||||
fake_tub = Mock()
|
|
||||||
config = read_config(basedir, "portnum")
|
|
||||||
|
|
||||||
with self.assertRaises(EnvironmentError):
|
|
||||||
create_introducer_clients(config, fake_tub)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_furl(self):
|
def test_furl(self):
|
||||||
@ -1037,23 +1036,53 @@ class Signatures(SyncTestCase):
|
|||||||
unsign_from_foolscap, (bad_msg, sig, b"v999-key"))
|
unsign_from_foolscap, (bad_msg, sig, b"v999-key"))
|
||||||
|
|
||||||
def test_unsigned_announcement(self):
|
def test_unsigned_announcement(self):
|
||||||
ed25519.verifying_key_from_string(b"pub-v0-wodst6ly4f7i7akt2nxizsmmy2rlmer6apltl56zctn67wfyu5tq")
|
"""
|
||||||
mock_tub = Mock()
|
An incorrectly signed announcement is not delivered to subscribers.
|
||||||
|
"""
|
||||||
|
private_key, public_key = ed25519.create_signing_keypair()
|
||||||
|
public_key_str = ed25519.string_from_verifying_key(public_key)
|
||||||
|
|
||||||
ic = IntroducerClient(
|
ic = IntroducerClient(
|
||||||
mock_tub,
|
Tub(),
|
||||||
"pb://",
|
"pb://",
|
||||||
u"fake_nick",
|
u"fake_nick",
|
||||||
"0.0.0",
|
"0.0.0",
|
||||||
"1.2.3",
|
"1.2.3",
|
||||||
(0, u"i am a nonce"),
|
(0, u"i am a nonce"),
|
||||||
"invalid",
|
FilePath(self.mktemp()),
|
||||||
|
)
|
||||||
|
received = {}
|
||||||
|
ic.subscribe_to("good-stuff", partial(setitem, received))
|
||||||
|
|
||||||
|
# Deliver a good message to prove our test code is valid.
|
||||||
|
ann = {"service-name": "good-stuff", "payload": "hello"}
|
||||||
|
ann_t = sign_to_foolscap(ann, private_key)
|
||||||
|
ic.got_announcements([ann_t])
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
{public_key_str[len("pub-"):]: ann},
|
||||||
|
received,
|
||||||
|
)
|
||||||
|
received.clear()
|
||||||
|
|
||||||
|
# Now deliver one without a valid signature and observe that it isn't
|
||||||
|
# delivered to the subscriber.
|
||||||
|
ann = {"service-name": "good-stuff", "payload": "bad stuff"}
|
||||||
|
(msg, sig, key) = sign_to_foolscap(ann, private_key)
|
||||||
|
# Drop a base32 word from the middle of the key to invalidate the
|
||||||
|
# signature.
|
||||||
|
sig_a = bytearray(sig)
|
||||||
|
sig_a[20:22] = []
|
||||||
|
sig = bytes(sig_a)
|
||||||
|
ann_t = (msg, sig, key)
|
||||||
|
ic.got_announcements([ann_t])
|
||||||
|
|
||||||
|
# The received announcements dict should remain empty because we
|
||||||
|
# should not receive the announcement with the invalid signature.
|
||||||
|
self.assertEqual(
|
||||||
|
{},
|
||||||
|
received,
|
||||||
)
|
)
|
||||||
self.assertEqual(0, ic._debug_counts["inbound_announcement"])
|
|
||||||
ic.got_announcements([
|
|
||||||
(b"message", b"v0-aaaaaaa", b"v0-wodst6ly4f7i7akt2nxizsmmy2rlmer6apltl56zctn67wfyu5tq")
|
|
||||||
])
|
|
||||||
# we should have rejected this announcement due to a bad signature
|
|
||||||
self.assertEqual(0, ic._debug_counts["inbound_announcement"])
|
|
||||||
|
|
||||||
|
|
||||||
# add tests of StorageFarmBroker: if it receives duplicate announcements, it
|
# add tests of StorageFarmBroker: if it receives duplicate announcements, it
|
||||||
|
@ -101,3 +101,56 @@ class Observer(unittest.TestCase):
|
|||||||
d.addCallback(_step2)
|
d.addCallback(_step2)
|
||||||
d.addCallback(_check2)
|
d.addCallback(_check2)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
def test_observer_list_reentrant(self):
|
||||||
|
"""
|
||||||
|
``ObserverList`` is reentrant.
|
||||||
|
"""
|
||||||
|
observed = []
|
||||||
|
|
||||||
|
def observer_one():
|
||||||
|
obs.unsubscribe(observer_one)
|
||||||
|
|
||||||
|
def observer_two():
|
||||||
|
observed.append(None)
|
||||||
|
|
||||||
|
obs = observer.ObserverList()
|
||||||
|
obs.subscribe(observer_one)
|
||||||
|
obs.subscribe(observer_two)
|
||||||
|
obs.notify()
|
||||||
|
|
||||||
|
self.assertEqual([None], observed)
|
||||||
|
|
||||||
|
def test_observer_list_observer_errors(self):
|
||||||
|
"""
|
||||||
|
An error in an earlier observer does not prevent notification from being
|
||||||
|
delivered to a later observer.
|
||||||
|
"""
|
||||||
|
observed = []
|
||||||
|
|
||||||
|
def observer_one():
|
||||||
|
raise Exception("Some problem here")
|
||||||
|
|
||||||
|
def observer_two():
|
||||||
|
observed.append(None)
|
||||||
|
|
||||||
|
obs = observer.ObserverList()
|
||||||
|
obs.subscribe(observer_one)
|
||||||
|
obs.subscribe(observer_two)
|
||||||
|
obs.notify()
|
||||||
|
|
||||||
|
self.assertEqual([None], observed)
|
||||||
|
self.assertEqual(1, len(self.flushLoggedErrors(Exception)))
|
||||||
|
|
||||||
|
def test_observer_list_propagate_keyboardinterrupt(self):
|
||||||
|
"""
|
||||||
|
``KeyboardInterrupt`` escapes ``ObserverList.notify``.
|
||||||
|
"""
|
||||||
|
def observer_one():
|
||||||
|
raise KeyboardInterrupt()
|
||||||
|
|
||||||
|
obs = observer.ObserverList()
|
||||||
|
obs.subscribe(observer_one)
|
||||||
|
|
||||||
|
with self.assertRaises(KeyboardInterrupt):
|
||||||
|
obs.notify()
|
||||||
|
@ -142,7 +142,9 @@ def a2b(cs):
|
|||||||
# Add padding back, to make Python's base64 module happy:
|
# Add padding back, to make Python's base64 module happy:
|
||||||
while (len(cs) * 5) % 8 != 0:
|
while (len(cs) * 5) % 8 != 0:
|
||||||
cs += b"="
|
cs += b"="
|
||||||
return base64.b32decode(cs)
|
# Let newbytes come through and still work on Python 2, where the base64
|
||||||
|
# module gets confused by them.
|
||||||
|
return base64.b32decode(backwardscompat_bytes(cs))
|
||||||
|
|
||||||
|
|
||||||
__all__ = ["b2a", "a2b", "b2a_or_none", "BASE32CHAR_3bits", "BASE32CHAR_1bits", "BASE32CHAR", "BASE32STR_anybytes", "could_be_base32_encoded"]
|
__all__ = ["b2a", "a2b", "b2a_or_none", "BASE32CHAR_3bits", "BASE32CHAR_1bits", "BASE32CHAR", "BASE32STR_anybytes", "could_be_base32_encoded"]
|
||||||
|
@ -16,6 +16,9 @@ if PY2:
|
|||||||
import weakref
|
import weakref
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from foolscap.api import eventually
|
from foolscap.api import eventually
|
||||||
|
from twisted.logger import (
|
||||||
|
Logger,
|
||||||
|
)
|
||||||
|
|
||||||
"""The idiom we use is for the observed object to offer a method named
|
"""The idiom we use is for the observed object to offer a method named
|
||||||
'when_something', which returns a deferred. That deferred will be fired when
|
'when_something', which returns a deferred. That deferred will be fired when
|
||||||
@ -97,7 +100,10 @@ class LazyOneShotObserverList(OneShotObserverList):
|
|||||||
self._fire(self._get_result())
|
self._fire(self._get_result())
|
||||||
|
|
||||||
class ObserverList(object):
|
class ObserverList(object):
|
||||||
"""A simple class to distribute events to a number of subscribers."""
|
"""
|
||||||
|
Immediately distribute events to a number of subscribers.
|
||||||
|
"""
|
||||||
|
_logger = Logger()
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._watchers = []
|
self._watchers = []
|
||||||
@ -109,8 +115,11 @@ class ObserverList(object):
|
|||||||
self._watchers.remove(observer)
|
self._watchers.remove(observer)
|
||||||
|
|
||||||
def notify(self, *args, **kwargs):
|
def notify(self, *args, **kwargs):
|
||||||
for o in self._watchers:
|
for o in self._watchers[:]:
|
||||||
eventually(o, *args, **kwargs)
|
try:
|
||||||
|
o(*args, **kwargs)
|
||||||
|
except Exception:
|
||||||
|
self._logger.failure("While notifying {o!r}", o=o)
|
||||||
|
|
||||||
class EventStreamObserver(object):
|
class EventStreamObserver(object):
|
||||||
"""A simple class to distribute multiple events to a single subscriber.
|
"""A simple class to distribute multiple events to a single subscriber.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user