import hashlib from mock import Mock from fixtures import ( TempDir, ) from zope.interface.verify import ( verifyObject, ) from twisted.application.service import ( Service, ) from twisted.trial import unittest from twisted.internet.defer import succeed, inlineCallbacks from twisted.python.filepath import ( FilePath, ) from foolscap.api import ( Tub, ) from .common import ( SyncTestCase, UseTestPlugins, MemoryIntroducerClient, ) from .storage_plugin import ( DummyStorageClient, ) from allmydata.util import base32, yamlutil from allmydata.client import ( config_from_string, create_client_from_config, ) from allmydata.storage_client import ( IFoolscapStorageServer, NativeStorageServer, StorageFarmBroker, _FoolscapStorage, _NullStorage, ) from allmydata.interfaces import ( IConnectionStatus, ) SOME_FURL = b"pb://abcde@nowhere/fake" class NativeStorageServerWithVersion(NativeStorageServer): def __init__(self, version): # note: these instances won't work for anything other than # get_available_space() because we don't upcall self.version = version def get_version(self): return self.version class TestNativeStorageServer(unittest.TestCase): def test_get_available_space_new(self): nss = NativeStorageServerWithVersion( { "http://allmydata.org/tahoe/protocols/storage/v1": { "maximum-immutable-share-size": 111, "available-space": 222, } }) self.failUnlessEqual(nss.get_available_space(), 222) def test_get_available_space_old(self): nss = NativeStorageServerWithVersion( { "http://allmydata.org/tahoe/protocols/storage/v1": { "maximum-immutable-share-size": 111, } }) self.failUnlessEqual(nss.get_available_space(), 111) def test_missing_nickname(self): ann = {"anonymous-storage-FURL": "pb://w2hqnbaa25yw4qgcvghl5psa3srpfgw3@tcp:127.0.0.1:51309/vucto2z4fxment3vfxbqecblbf6zyp6x", "permutation-seed-base32": "w2hqnbaa25yw4qgcvghl5psa3srpfgw3", } nss = NativeStorageServer("server_id", ann, None, {}) self.assertEqual(nss.get_nickname(), "") class GetConnectionStatus(unittest.TestCase): """ Tests for ``NativeStorageServer.get_connection_status``. """ def test_unrecognized_announcement(self): """ When ``NativeStorageServer`` is constructed with a storage announcement it doesn't recognize, its ``get_connection_status`` nevertheless returns an object which provides ``IConnectionStatus``. """ # Pretty hard to recognize anything from an empty announcement. ann = {} nss = NativeStorageServer("server_id", ann, Tub, {}) nss.start_connecting(lambda: None) connection_status = nss.get_connection_status() self.assertTrue(IConnectionStatus.providedBy(connection_status)) class UnrecognizedAnnouncement(unittest.TestCase): """ Tests for handling of announcements that aren't recognized and don't use *anonymous-storage-FURL*. Recognition failure is created by making up something completely novel for these tests. In real use, recognition failure would most likely come from an announcement generated by a storage server plugin which is not loaded in the client. """ ann = { u"name": u"tahoe-lafs-testing-v1", u"any-parameter": 12345, } server_id = b"abc" def _tub_maker(self, overrides): return Service() def native_storage_server(self): """ Make a ``NativeStorageServer`` out of an unrecognizable announcement. """ return NativeStorageServer( self.server_id, self.ann, self._tub_maker, {}, ) def test_no_exceptions(self): """ ``NativeStorageServer`` can be instantiated with an unrecognized announcement. """ self.native_storage_server() def test_start_connecting(self): """ ``NativeStorageServer.start_connecting`` does not raise an exception. """ server = self.native_storage_server() server.start_connecting(None) def test_stop_connecting(self): """ ``NativeStorageServer.stop_connecting`` does not raise an exception. """ server = self.native_storage_server() server.start_connecting(None) server.stop_connecting() def test_try_to_connect(self): """ ``NativeStorageServer.try_to_connect`` does not raise an exception. """ server = self.native_storage_server() server.start_connecting(None) server.try_to_connect() def test_various_data_methods(self): """ The data accessors of ``NativeStorageServer`` that depend on the announcement do not raise an exception. """ server = self.native_storage_server() server.get_permutation_seed() server.get_name() server.get_longname() server.get_tubid() server.get_lease_seed() server.get_foolscap_write_enabler_seed() server.get_nickname() class PluginMatchedAnnouncement(SyncTestCase): """ Tests for handling by ``NativeStorageServer`` of storage server announcements that are handled by an ``IFoolscapStoragePlugin``. """ @inlineCallbacks def setUp(self): super(PluginMatchedAnnouncement, self).setUp() tempdir = TempDir() self.useFixture(tempdir) self.basedir = FilePath(tempdir.path) self.basedir.child(u"private").makedirs() self.useFixture(UseTestPlugins()) self.config = config_from_string( self.basedir.asTextMode().path, u"tub.port", b""" [client] introducer.furl = {furl} storage.plugins = tahoe-lafs-dummy-v1 """.format(furl=SOME_FURL), ) self.node = yield create_client_from_config( self.config, _introducer_factory=MemoryIntroducerClient, ) [self.introducer_client] = self.node.introducer_clients def publish(self, server_id, announcement): for subscription in self.introducer_client.subscribed_to: if subscription.service_name == u"storage": subscription.cb( server_id, announcement, *subscription.args, **subscription.kwargs ) def get_storage(self, server_id, node): storage_broker = node.get_storage_broker() native_storage_server = storage_broker.servers[server_id] return native_storage_server._storage def test_ignored_non_enabled_plugin(self): """ An announcement that could be matched by a plugin that is not enabled is not matched. """ server_id = b"v0-abcdef" ann = { u"service-name": u"storage", u"storage-options": [{ # notice how the announcement is for a different storage plugin # than the one that is enabled. u"name": u"tahoe-lafs-dummy-v2", u"storage-server-FURL": SOME_FURL.decode("ascii"), }], } self.publish(server_id, ann) storage = self.get_storage(server_id, self.node) self.assertIsInstance(storage, _NullStorage) def test_enabled_plugin(self): """ An announcement that could be matched by a plugin that is enabled is matched and the plugin's storage client is used. """ server_id = b"v0-abcdef" ann = { u"service-name": u"storage", u"storage-options": [{ # and this announcement is for a plugin with a matching name u"name": u"tahoe-lafs-dummy-v1", u"storage-server-FURL": SOME_FURL.decode("ascii"), }], } self.publish(server_id, ann) storage = self.get_storage(server_id, self.node) self.assertTrue( verifyObject( IFoolscapStorageServer, storage, ), ) self.assertIsInstance(storage.storage_server, DummyStorageClient) class FoolscapStorageServers(unittest.TestCase): """ Tests for implementations of ``IFoolscapStorageServer``. """ def test_null_provider(self): """ Instances of ``_NullStorage`` provide ``IFoolscapStorageServer``. """ self.assertTrue( verifyObject( IFoolscapStorageServer, _NullStorage(), ), ) def test_foolscap_provider(self): """ Instances of ``_FoolscapStorage`` provide ``IFoolscapStorageServer``. """ self.assertTrue( verifyObject( IFoolscapStorageServer, _FoolscapStorage.from_announcement( u"server-id", SOME_FURL, {u"permutation-seed-base32": base32.b2a(b"permutationseed")}, object(), ), ), ) class TestStorageFarmBroker(unittest.TestCase): def test_static_servers(self): broker = StorageFarmBroker(True, lambda h: Mock()) key_s = 'v0-1234-1' servers_yaml = b"""\ storage: v0-1234-1: ann: anonymous-storage-FURL: {furl} permutation-seed-base32: aaaaaaaaaaaaaaaaaaaaaaaa """.format(furl=SOME_FURL) servers = yamlutil.safe_load(servers_yaml) permseed = base32.a2b("aaaaaaaaaaaaaaaaaaaaaaaa") broker.set_static_servers(servers["storage"]) self.failUnlessEqual(len(broker._static_server_ids), 1) s = broker.servers[key_s] self.failUnlessEqual(s.announcement, servers["storage"]["v0-1234-1"]["ann"]) self.failUnlessEqual(s.get_serverid(), key_s) self.assertEqual(s.get_permutation_seed(), permseed) # if the Introducer announces the same thing, we're supposed to # ignore it ann2 = { "service-name": "storage", "anonymous-storage-FURL": "pb://{}@nowhere/fake2".format(base32.b2a(str(1))), "permutation-seed-base32": "bbbbbbbbbbbbbbbbbbbbbbbb", } broker._got_announcement(key_s, ann2) s2 = broker.servers[key_s] self.assertIdentical(s2, s) self.assertEqual(s2.get_permutation_seed(), permseed) def test_static_permutation_seed_pubkey(self): broker = StorageFarmBroker(True, lambda h: Mock()) server_id = "v0-4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia" k = "4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia" ann = { "anonymous-storage-FURL": SOME_FURL, } broker.set_static_servers({server_id.decode("ascii"): {"ann": ann}}) s = broker.servers[server_id] self.assertEqual(s.get_permutation_seed(), base32.a2b(k)) def test_static_permutation_seed_explicit(self): broker = StorageFarmBroker(True, lambda h: Mock()) server_id = "v0-4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia" k = "w5gl5igiexhwmftwzhai5jy2jixn7yx7" ann = { "anonymous-storage-FURL": SOME_FURL, "permutation-seed-base32": k, } broker.set_static_servers({server_id.decode("ascii"): {"ann": ann}}) s = broker.servers[server_id] self.assertEqual(s.get_permutation_seed(), base32.a2b(k)) def test_static_permutation_seed_hashed(self): broker = StorageFarmBroker(True, lambda h: Mock()) server_id = "unparseable" ann = { "anonymous-storage-FURL": SOME_FURL, } broker.set_static_servers({server_id.decode("ascii"): {"ann": ann}}) s = broker.servers[server_id] self.assertEqual(s.get_permutation_seed(), hashlib.sha256(server_id).digest()) @inlineCallbacks def test_threshold_reached(self): introducer = Mock() new_tubs = [] def make_tub(*args, **kwargs): return new_tubs.pop() broker = StorageFarmBroker(True, make_tub) done = broker.when_connected_enough(5) broker.use_introducer(introducer) # subscribes to "storage" to learn of new storage nodes subscribe = introducer.mock_calls[0] self.assertEqual(subscribe[0], 'subscribe_to') self.assertEqual(subscribe[1][0], 'storage') got_announcement = subscribe[1][1] data = { "service-name": "storage", "anonymous-storage-FURL": None, "permutation-seed-base32": "aaaaaaaaaaaaaaaaaaaaaaaa", } def add_one_server(x): data["anonymous-storage-FURL"] = "pb://{}@nowhere/fake".format(base32.b2a(str(x))) tub = Mock() new_tubs.append(tub) got_announcement('v0-1234-{}'.format(x), data) self.assertEqual(tub.mock_calls[-1][0], 'connectTo') got_connection = tub.mock_calls[-1][1][1] rref = Mock() rref.callRemote = Mock(return_value=succeed(1234)) got_connection(rref) # first 4 shouldn't trigger connected_threashold for x in range(4): add_one_server(x) self.assertFalse(done.called) # ...but the 5th *should* trigger the threshold add_one_server(42) # so: the OneShotObserverList only notifies via # foolscap.eventually() -- which forces the Deferred call # through the reactor -- so it's no longer synchronous, # meaning that we have to do "real reactor stuff" for the # Deferred from when_connected_enough() to actually fire. (or # @patch() out the reactor in foolscap.eventually to be a # Clock() so we can advance time ourselves, but ... luckily # eventually() uses 0 as the timeout currently) yield done self.assertTrue(done.called)