# This contains a test harness that creates a full Tahoe grid in a single # process (actually in a single MultiService) which does not use the network. # It does not use an Introducer, and there are no foolscap Tubs. Each storage # server puts real shares on disk, but is accessed through loopback # RemoteReferences instead of over serialized SSL. It is not as complete as # the common.SystemTestMixin framework (which does use the network), but # should be considerably faster: on my laptop, it takes 50-80ms to start up, # whereas SystemTestMixin takes close to 2s. # This should be useful for tests which want to examine and/or manipulate the # uploaded shares, checker/verifier/repairer tests, etc. The clients have no # Tubs, so it is not useful for tests that involve a Helper, a KeyGenerator, # or the control.furl . import os.path import sha from twisted.application import service from twisted.internet import reactor from foolscap.api import Referenceable, fireEventually from base64 import b32encode from allmydata import uri as tahoe_uri from allmydata.client import Client from allmydata.storage.server import StorageServer, storage_index_to_dir from allmydata.util import fileutil, idlib, hashutil, rrefutil from allmydata.introducer.client import RemoteServiceConnector from allmydata.test.common_web import HTTPClientGETFactory class IntentionalError(Exception): pass class Marker: pass class LocalWrapper: def __init__(self, original): self.original = original self.broken = False self.post_call_notifier = None self.disconnectors = {} def callRemoteOnly(self, methname, *args, **kwargs): d = self.callRemote(methname, *args, **kwargs) return None def callRemote(self, methname, *args, **kwargs): # this is ideally a Membrane, but that's too hard. We do a shallow # wrapping of inbound arguments, and per-methodname wrapping of # selected return values. def wrap(a): if isinstance(a, Referenceable): return LocalWrapper(a) else: return a args = tuple([wrap(a) for a in args]) kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs]) def _call(): if self.broken: raise IntentionalError("I was asked to break") meth = getattr(self.original, "remote_" + methname) return meth(*args, **kwargs) d = fireEventually() d.addCallback(lambda res: _call()) def _return_membrane(res): # rather than complete the difficult task of building a # fully-general Membrane (which would locate all Referenceable # objects that cross the simulated wire and replace them with # wrappers), we special-case certain methods that we happen to # know will return Referenceables. if methname == "allocate_buckets": (alreadygot, allocated) = res for shnum in allocated: allocated[shnum] = LocalWrapper(allocated[shnum]) if methname == "get_buckets": for shnum in res: res[shnum] = LocalWrapper(res[shnum]) return res d.addCallback(_return_membrane) if self.post_call_notifier: d.addCallback(self.post_call_notifier, methname) return d def notifyOnDisconnect(self, f, *args, **kwargs): m = Marker() self.disconnectors[m] = (f, args, kwargs) return m def dontNotifyOnDisconnect(self, marker): del self.disconnectors[marker] def wrap(original, service_name): # The code in immutable.checker insists upon asserting the truth of # isinstance(rref, rrefutil.WrappedRemoteReference). Much of the # upload/download code uses rref.version (which normally comes from # rrefutil.VersionedRemoteReference). To avoid using a network, we want a # LocalWrapper here. Try to satisfy all these constraints at the same # time. local = LocalWrapper(original) wrapped = rrefutil.WrappedRemoteReference(local) try: version = original.remote_get_version() except AttributeError: version = RemoteServiceConnector.VERSION_DEFAULTS[service_name] wrapped.version = version return wrapped class NoNetworkClient(Client): def create_tub(self): pass def init_introducer_client(self): pass def setup_logging(self): pass def startService(self): service.MultiService.startService(self) def stopService(self): service.MultiService.stopService(self) def when_tub_ready(self): raise NotImplementedError("NoNetworkClient has no Tub") def init_control(self): pass def init_helper(self): pass def init_key_gen(self): pass def init_storage(self): pass def init_stub_client(self): pass def get_servers(self, service_name): return self._servers def get_permuted_peers(self, service_name, key): return sorted(self._servers, key=lambda x: sha.new(key+x[0]).digest()) def get_nickname_for_peerid(self, peerid): return None class SimpleStats: def __init__(self): self.counters = {} self.stats_producers = [] def count(self, name, delta=1): val = self.counters.setdefault(name, 0) self.counters[name] = val + delta def register_producer(self, stats_producer): self.stats_producers.append(stats_producer) def get_stats(self): stats = {} for sp in self.stats_producers: stats.update(sp.get_stats()) ret = { 'counters': self.counters, 'stats': stats } return ret class NoNetworkGrid(service.MultiService): def __init__(self, basedir, num_clients=1, num_servers=10, client_config_hooks={}): service.MultiService.__init__(self) self.basedir = basedir fileutil.make_dirs(basedir) self.servers_by_number = {} self.servers_by_id = {} self.clients = [] for i in range(num_servers): ss = self.make_server(i) self.add_server(i, ss) for i in range(num_clients): clientid = hashutil.tagged_hash("clientid", str(i))[:20] clientdir = os.path.join(basedir, "clients", idlib.shortnodeid_b2a(clientid)) fileutil.make_dirs(clientdir) f = open(os.path.join(clientdir, "tahoe.cfg"), "w") f.write("[node]\n") f.write("nickname = client-%d\n" % i) f.write("web.port = tcp:0:interface=127.0.0.1\n") f.write("[storage]\n") f.write("enabled = false\n") f.close() c = None if i in client_config_hooks: # this hook can either modify tahoe.cfg, or return an # entirely new Client instance c = client_config_hooks[i](clientdir) if not c: c = NoNetworkClient(clientdir) c.nodeid = clientid c.short_nodeid = b32encode(clientid).lower()[:8] c._servers = self.all_servers # can be updated later c.setServiceParent(self) self.clients.append(c) def make_server(self, i): serverid = hashutil.tagged_hash("serverid", str(i))[:20] serverdir = os.path.join(self.basedir, "servers", idlib.shortnodeid_b2a(serverid)) fileutil.make_dirs(serverdir) ss = StorageServer(serverdir, serverid, stats_provider=SimpleStats()) return ss def add_server(self, i, ss): # to deal with the fact that all StorageServers are named 'storage', # we interpose a middleman middleman = service.MultiService() middleman.setServiceParent(self) ss.setServiceParent(middleman) serverid = ss.my_nodeid self.servers_by_number[i] = ss self.servers_by_id[serverid] = wrap(ss, "storage") self.all_servers = frozenset(self.servers_by_id.items()) for c in self.clients: c._servers = self.all_servers class GridTestMixin: def setUp(self): self.s = service.MultiService() self.s.startService() def tearDown(self): return self.s.stopService() def set_up_grid(self, num_clients=1, num_servers=10, client_config_hooks={}): # self.basedir must be set self.g = NoNetworkGrid(self.basedir, num_clients=num_clients, num_servers=num_servers, client_config_hooks=client_config_hooks) self.g.setServiceParent(self.s) self.client_webports = [c.getServiceNamed("webish").listener._port.getHost().port for c in self.g.clients] self.client_baseurls = ["http://localhost:%d/" % p for p in self.client_webports] def get_clientdir(self, i=0): return self.g.clients[i].basedir def get_serverdir(self, i): return self.g.servers_by_number[i].storedir def iterate_servers(self): for i in sorted(self.g.servers_by_number.keys()): ss = self.g.servers_by_number[i] yield (i, ss, ss.storedir) def find_shares(self, uri): si = tahoe_uri.from_string(uri).get_storage_index() prefixdir = storage_index_to_dir(si) shares = [] for i,ss in self.g.servers_by_number.items(): serverid = ss.my_nodeid basedir = os.path.join(ss.storedir, "shares", prefixdir) if not os.path.exists(basedir): continue for f in os.listdir(basedir): try: shnum = int(f) shares.append((shnum, serverid, os.path.join(basedir, f))) except ValueError: pass return sorted(shares) def delete_share(self, (shnum, serverid, sharefile)): os.unlink(sharefile) def delete_shares_numbered(self, uri, shnums): for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri): if i_shnum in shnums: os.unlink(i_sharefile) def corrupt_share(self, (shnum, serverid, sharefile), corruptor_function): sharedata = open(sharefile, "rb").read() corruptdata = corruptor_function(sharedata) open(sharefile, "wb").write(corruptdata) def corrupt_shares_numbered(self, uri, shnums, corruptor): for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri): if i_shnum in shnums: sharedata = open(i_sharefile, "rb").read() corruptdata = corruptor(sharedata) open(i_sharefile, "wb").write(corruptdata) def GET(self, urlpath, followRedirect=False, return_response=False, method="GET", clientnum=0, **kwargs): # if return_response=True, this fires with (data, statuscode, # respheaders) instead of just data. assert not isinstance(urlpath, unicode) url = self.client_baseurls[clientnum] + urlpath factory = HTTPClientGETFactory(url, method=method, followRedirect=followRedirect, **kwargs) reactor.connectTCP("localhost", self.client_webports[clientnum],factory) d = factory.deferred def _got_data(data): return (data, factory.status, factory.response_headers) if return_response: d.addCallback(_got_data) return factory.deferred