from base64 import b32encode import os, sys, time, simplejson from cStringIO import StringIO from twisted.trial import unittest from twisted.internet import defer from twisted.internet import threads # CLI tests use deferToThread import allmydata from allmydata import uri from allmydata.storage.mutable import MutableShareFile from allmydata.storage.server import si_a2b from allmydata.immutable import offloaded, upload from allmydata.immutable.literal import LiteralFileNode from allmydata.immutable.filenode import ImmutableFileNode from allmydata.util import idlib, mathutil from allmydata.util import log, base32 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.util.consumer import MemoryConsumer, download_to_data from allmydata.scripts import runner from allmydata.interfaces import IDirectoryNode, IFileNode, \ NoSuchChildError, NoSharesError from allmydata.monitor import Monitor from allmydata.mutable.common import NotWriteableError from allmydata.mutable import layout as mutable_layout from foolscap.api import DeadReferenceError from twisted.python.failure import Failure from twisted.web.client import getPage from twisted.web.error import Error from allmydata.test.common import SystemTestMixin LARGE_DATA = """ This is some data to publish to the remote grid.., which needs to be large enough to not fit inside a LIT uri. """ class CountingDataUploadable(upload.Data): bytes_read = 0 interrupt_after = None interrupt_after_d = None def read(self, length): self.bytes_read += length if self.interrupt_after is not None: if self.bytes_read > self.interrupt_after: self.interrupt_after = None self.interrupt_after_d.callback(self) return upload.Data.read(self, length) class SystemTest(SystemTestMixin, unittest.TestCase): timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box. def test_connections(self): self.basedir = "system/SystemTest/test_connections" d = self.set_up_nodes() self.extra_node = None d.addCallback(lambda res: self.add_extra_node(self.numclients)) def _check(extra_node): self.extra_node = extra_node for c in self.clients: all_peerids = c.get_storage_broker().get_all_serverids() self.failUnlessEqual(len(all_peerids), self.numclients+1) sb = c.storage_broker permuted_peers = sb.get_servers_for_index("a") self.failUnlessEqual(len(permuted_peers), self.numclients+1) d.addCallback(_check) def _shutdown_extra_node(res): if self.extra_node: return self.extra_node.stopService() return res d.addBoth(_shutdown_extra_node) return d # test_connections is subsumed by test_upload_and_download, and takes # quite a while to run on a slow machine (because of all the TLS # connections that must be established). If we ever rework the introducer # code to such an extent that we're not sure if it works anymore, we can # reinstate this test until it does. del test_connections def test_upload_and_download_random_key(self): self.basedir = "system/SystemTest/test_upload_and_download_random_key" return self._test_upload_and_download(convergence=None) def test_upload_and_download_convergent(self): self.basedir = "system/SystemTest/test_upload_and_download_convergent" return self._test_upload_and_download(convergence="some convergence string") def _test_upload_and_download(self, convergence): # we use 4000 bytes of data, which will result in about 400k written # to disk among all our simulated nodes DATA = "Some data to upload\n" * 200 d = self.set_up_nodes() def _check_connections(res): for c in self.clients: c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5 all_peerids = c.get_storage_broker().get_all_serverids() self.failUnlessEqual(len(all_peerids), self.numclients) sb = c.storage_broker permuted_peers = sb.get_servers_for_index("a") self.failUnlessEqual(len(permuted_peers), self.numclients) d.addCallback(_check_connections) def _do_upload(res): log.msg("UPLOADING") u = self.clients[0].getServiceNamed("uploader") self.uploader = u # we crank the max segsize down to 1024b for the duration of this # test, so we can exercise multiple segments. It is important # that this is not a multiple of the segment size, so that the # tail segment is not the same length as the others. This actualy # gets rounded up to 1025 to be a multiple of the number of # required shares (since we use 25 out of 100 FEC). up = upload.Data(DATA, convergence=convergence) up.max_segment_size = 1024 d1 = u.upload(up) return d1 d.addCallback(_do_upload) def _upload_done(results): theuri = results.uri log.msg("upload finished: uri is %s" % (theuri,)) self.uri = theuri assert isinstance(self.uri, str), self.uri self.cap = uri.from_string(self.uri) self.n = self.clients[1].create_node_from_uri(self.uri) d.addCallback(_upload_done) def _upload_again(res): # Upload again. If using convergent encryption then this ought to be # short-circuited, however with the way we currently generate URIs # (i.e. because they include the roothash), we have to do all of the # encoding work, and only get to save on the upload part. log.msg("UPLOADING AGAIN") up = upload.Data(DATA, convergence=convergence) up.max_segment_size = 1024 return self.uploader.upload(up) d.addCallback(_upload_again) def _download_to_data(res): log.msg("DOWNLOADING") return download_to_data(self.n) d.addCallback(_download_to_data) def _download_to_data_done(data): log.msg("download finished") self.failUnlessEqual(data, DATA) d.addCallback(_download_to_data_done) def _test_read(res): n = self.clients[1].create_node_from_uri(self.uri) d = download_to_data(n) def _read_done(data): self.failUnlessEqual(data, DATA) d.addCallback(_read_done) d.addCallback(lambda ign: n.read(MemoryConsumer(), offset=1, size=4)) def _read_portion_done(mc): self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4]) d.addCallback(_read_portion_done) d.addCallback(lambda ign: n.read(MemoryConsumer(), offset=2, size=None)) def _read_tail_done(mc): self.failUnlessEqual("".join(mc.chunks), DATA[2:]) d.addCallback(_read_tail_done) d.addCallback(lambda ign: n.read(MemoryConsumer(), size=len(DATA)+1000)) def _read_too_much(mc): self.failUnlessEqual("".join(mc.chunks), DATA) d.addCallback(_read_too_much) return d d.addCallback(_test_read) def _test_bad_read(res): bad_u = uri.from_string_filenode(self.uri) bad_u.key = self.flip_bit(bad_u.key) bad_n = self.clients[1].create_node_from_uri(bad_u.to_string()) # this should cause an error during download d = self.shouldFail2(NoSharesError, "'download bad node'", None, bad_n.read, MemoryConsumer(), offset=2) return d d.addCallback(_test_bad_read) def _download_nonexistent_uri(res): baduri = self.mangle_uri(self.uri) badnode = self.clients[1].create_node_from_uri(baduri) log.msg("about to download non-existent URI", level=log.UNUSUAL, facility="tahoe.tests") d1 = download_to_data(badnode) def _baduri_should_fail(res): log.msg("finished downloading non-existend URI", level=log.UNUSUAL, facility="tahoe.tests") self.failUnless(isinstance(res, Failure)) self.failUnless(res.check(NoSharesError), "expected NoSharesError, got %s" % res) d1.addBoth(_baduri_should_fail) return d1 d.addCallback(_download_nonexistent_uri) # add a new node, which doesn't accept shares, and only uses the # helper for upload. d.addCallback(lambda res: self.add_extra_node(self.numclients, self.helper_furl, add_to_sparent=True)) def _added(extra_node): self.extra_node = extra_node self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5 d.addCallback(_added) HELPER_DATA = "Data that needs help to upload" * 1000 def _upload_with_helper(res): u = upload.Data(HELPER_DATA, convergence=convergence) d = self.extra_node.upload(u) def _uploaded(results): n = self.clients[1].create_node_from_uri(results.uri) return download_to_data(n) d.addCallback(_uploaded) def _check(newdata): self.failUnlessEqual(newdata, HELPER_DATA) d.addCallback(_check) return d d.addCallback(_upload_with_helper) def _upload_duplicate_with_helper(res): u = upload.Data(HELPER_DATA, convergence=convergence) u.debug_stash_RemoteEncryptedUploadable = True d = self.extra_node.upload(u) def _uploaded(results): n = self.clients[1].create_node_from_uri(results.uri) return download_to_data(n) d.addCallback(_uploaded) def _check(newdata): self.failUnlessEqual(newdata, HELPER_DATA) self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"), "uploadable started uploading, should have been avoided") d.addCallback(_check) return d if convergence is not None: d.addCallback(_upload_duplicate_with_helper) def _upload_resumable(res): DATA = "Data that needs help to upload and gets interrupted" * 1000 u1 = CountingDataUploadable(DATA, convergence=convergence) u2 = CountingDataUploadable(DATA, convergence=convergence) # we interrupt the connection after about 5kB by shutting down # the helper, then restartingit. u1.interrupt_after = 5000 u1.interrupt_after_d = defer.Deferred() u1.interrupt_after_d.addCallback(lambda res: self.bounce_client(0)) # sneak into the helper and reduce its chunk size, so that our # debug_interrupt will sever the connection on about the fifth # chunk fetched. This makes sure that we've started to write the # new shares before we abandon them, which exercises the # abort/delete-partial-share code. TODO: find a cleaner way to do # this. I know that this will affect later uses of the helper in # this same test run, but I'm not currently worried about it. offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000 d = self.extra_node.upload(u1) def _should_not_finish(res): self.fail("interrupted upload should have failed, not finished" " with result %s" % (res,)) def _interrupted(f): f.trap(DeadReferenceError) # make sure we actually interrupted it before finishing the # file self.failUnless(u1.bytes_read < len(DATA), "read %d out of %d total" % (u1.bytes_read, len(DATA))) log.msg("waiting for reconnect", level=log.NOISY, facility="tahoe.test.test_system") # now, we need to give the nodes a chance to notice that this # connection has gone away. When this happens, the storage # servers will be told to abort their uploads, removing the # partial shares. Unfortunately this involves TCP messages # going through the loopback interface, and we can't easily # predict how long that will take. If it were all local, we # could use fireEventually() to stall. Since we don't have # the right introduction hooks, the best we can do is use a # fixed delay. TODO: this is fragile. u1.interrupt_after_d.addCallback(self.stall, 2.0) return u1.interrupt_after_d d.addCallbacks(_should_not_finish, _interrupted) def _disconnected(res): # check to make sure the storage servers aren't still hanging # on to the partial share: their incoming/ directories should # now be empty. log.msg("disconnected", level=log.NOISY, facility="tahoe.test.test_system") for i in range(self.numclients): incdir = os.path.join(self.getdir("client%d" % i), "storage", "shares", "incoming") self.failIf(os.path.exists(incdir) and os.listdir(incdir)) d.addCallback(_disconnected) # then we need to give the reconnector a chance to # reestablish the connection to the helper. d.addCallback(lambda res: log.msg("wait_for_connections", level=log.NOISY, facility="tahoe.test.test_system")) d.addCallback(lambda res: self.wait_for_connections()) d.addCallback(lambda res: log.msg("uploading again", level=log.NOISY, facility="tahoe.test.test_system")) d.addCallback(lambda res: self.extra_node.upload(u2)) def _uploaded(results): cap = results.uri log.msg("Second upload complete", level=log.NOISY, facility="tahoe.test.test_system") # this is really bytes received rather than sent, but it's # convenient and basically measures the same thing bytes_sent = results.ciphertext_fetched self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent) # We currently don't support resumption of upload if the data is # encrypted with a random key. (Because that would require us # to store the key locally and re-use it on the next upload of # this file, which isn't a bad thing to do, but we currently # don't do it.) if convergence is not None: # Make sure we did not have to read the whole file the # second time around . self.failUnless(bytes_sent < len(DATA), "resumption didn't save us any work:" " read %r bytes out of %r total" % (bytes_sent, len(DATA))) else: # Make sure we did have to read the whole file the second # time around -- because the one that we partially uploaded # earlier was encrypted with a different random key. self.failIf(bytes_sent < len(DATA), "resumption saved us some work even though we were using random keys:" " read %r bytes out of %r total" % (bytes_sent, len(DATA))) n = self.clients[1].create_node_from_uri(cap) return download_to_data(n) d.addCallback(_uploaded) def _check(newdata): self.failUnlessEqual(newdata, DATA) # If using convergent encryption, then also check that the # helper has removed the temp file from its directories. if convergence is not None: basedir = os.path.join(self.getdir("client0"), "helper") files = os.listdir(os.path.join(basedir, "CHK_encoding")) self.failUnlessEqual(files, []) files = os.listdir(os.path.join(basedir, "CHK_incoming")) self.failUnlessEqual(files, []) d.addCallback(_check) return d d.addCallback(_upload_resumable) def _grab_stats(ignored): # the StatsProvider doesn't normally publish a FURL: # instead it passes a live reference to the StatsGatherer # (if and when it connects). To exercise the remote stats # interface, we manually publish client0's StatsProvider # and use client1 to query it. sp = self.clients[0].stats_provider sp_furl = self.clients[0].tub.registerReference(sp) d = self.clients[1].tub.getReference(sp_furl) d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats")) def _got_stats(stats): #print "STATS" #from pprint import pprint #pprint(stats) s = stats["stats"] self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1) c = stats["counters"] self.failUnless("storage_server.allocate" in c) d.addCallback(_got_stats) return d d.addCallback(_grab_stats) return d def _find_all_shares(self, basedir): shares = [] for (dirpath, dirnames, filenames) in os.walk(basedir): if "storage" not in dirpath: continue if not filenames: continue pieces = dirpath.split(os.sep) if (len(pieces) >= 5 and pieces[-4] == "storage" and pieces[-3] == "shares"): # we're sitting in .../storage/shares/$START/$SINDEX , and there # are sharefiles here assert pieces[-5].startswith("client") client_num = int(pieces[-5][-1]) storage_index_s = pieces[-1] storage_index = si_a2b(storage_index_s) for sharename in filenames: shnum = int(sharename) filename = os.path.join(dirpath, sharename) data = (client_num, storage_index, filename, shnum) shares.append(data) if not shares: self.fail("unable to find any share files in %s" % basedir) return shares def _corrupt_mutable_share(self, filename, which): msf = MutableShareFile(filename) datav = msf.readv([ (0, 1000000) ]) final_share = datav[0] assert len(final_share) < 1000000 # ought to be truncated pieces = mutable_layout.unpack_share(final_share) (seqnum, root_hash, IV, k, N, segsize, datalen, verification_key, signature, share_hash_chain, block_hash_tree, share_data, enc_privkey) = pieces if which == "seqnum": seqnum = seqnum + 15 elif which == "R": root_hash = self.flip_bit(root_hash) elif which == "IV": IV = self.flip_bit(IV) elif which == "segsize": segsize = segsize + 15 elif which == "pubkey": verification_key = self.flip_bit(verification_key) elif which == "signature": signature = self.flip_bit(signature) elif which == "share_hash_chain": nodenum = share_hash_chain.keys()[0] share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum]) elif which == "block_hash_tree": block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1]) elif which == "share_data": share_data = self.flip_bit(share_data) elif which == "encprivkey": enc_privkey = self.flip_bit(enc_privkey) prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N, segsize, datalen) final_share = mutable_layout.pack_share(prefix, verification_key, signature, share_hash_chain, block_hash_tree, share_data, enc_privkey) msf.writev( [(0, final_share)], None) def test_mutable(self): self.basedir = "system/SystemTest/test_mutable" DATA = "initial contents go here." # 25 bytes % 3 != 0 NEWDATA = "new contents yay" NEWERDATA = "this is getting old" d = self.set_up_nodes(use_key_generator=True) def _create_mutable(res): c = self.clients[0] log.msg("starting create_mutable_file") d1 = c.create_mutable_file(DATA) def _done(res): log.msg("DONE: %s" % (res,)) self._mutable_node_1 = res d1.addCallback(_done) return d1 d.addCallback(_create_mutable) def _test_debug(res): # find a share. It is important to run this while there is only # one slot in the grid. shares = self._find_all_shares(self.basedir) (client_num, storage_index, filename, shnum) = shares[0] log.msg("test_system.SystemTest.test_mutable._test_debug using %s" % filename) log.msg(" for clients[%d]" % client_num) out,err = StringIO(), StringIO() rc = runner.runner(["debug", "dump-share", "--offsets", filename], stdout=out, stderr=err) output = out.getvalue() self.failUnlessEqual(rc, 0) try: self.failUnless("Mutable slot found:\n" in output) self.failUnless("share_type: SDMF\n" in output) peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid) self.failUnless(" WE for nodeid: %s\n" % peerid in output) self.failUnless(" num_extra_leases: 0\n" in output) self.failUnless(" secrets are for nodeid: %s\n" % peerid in output) self.failUnless(" SDMF contents:\n" in output) self.failUnless(" seqnum: 1\n" in output) self.failUnless(" required_shares: 3\n" in output) self.failUnless(" total_shares: 10\n" in output) self.failUnless(" segsize: 27\n" in output, (output, filename)) self.failUnless(" datalen: 25\n" in output) # the exact share_hash_chain nodes depends upon the sharenum, # and is more of a hassle to compute than I want to deal with # now self.failUnless(" share_hash_chain: " in output) self.failUnless(" block_hash_tree: 1 nodes\n" in output) expected = (" verify-cap: URI:SSK-Verifier:%s:" % base32.b2a(storage_index)) self.failUnless(expected in output) except unittest.FailTest: print print "dump-share output was:" print output raise d.addCallback(_test_debug) # test retrieval # first, let's see if we can use the existing node to retrieve the # contents. This allows it to use the cached pubkey and maybe the # latest-known sharemap. d.addCallback(lambda res: self._mutable_node_1.download_best_version()) def _check_download_1(res): self.failUnlessEqual(res, DATA) # now we see if we can retrieve the data from a new node, # constructed using the URI of the original one. We do this test # on the same client that uploaded the data. uri = self._mutable_node_1.get_uri() log.msg("starting retrieve1") newnode = self.clients[0].create_node_from_uri(uri) newnode_2 = self.clients[0].create_node_from_uri(uri) self.failUnlessIdentical(newnode, newnode_2) return newnode.download_best_version() d.addCallback(_check_download_1) def _check_download_2(res): self.failUnlessEqual(res, DATA) # same thing, but with a different client uri = self._mutable_node_1.get_uri() newnode = self.clients[1].create_node_from_uri(uri) log.msg("starting retrieve2") d1 = newnode.download_best_version() d1.addCallback(lambda res: (res, newnode)) return d1 d.addCallback(_check_download_2) def _check_download_3((res, newnode)): self.failUnlessEqual(res, DATA) # replace the data log.msg("starting replace1") d1 = newnode.overwrite(NEWDATA) d1.addCallback(lambda res: newnode.download_best_version()) return d1 d.addCallback(_check_download_3) def _check_download_4(res): self.failUnlessEqual(res, NEWDATA) # now create an even newer node and replace the data on it. This # new node has never been used for download before. uri = self._mutable_node_1.get_uri() newnode1 = self.clients[2].create_node_from_uri(uri) newnode2 = self.clients[3].create_node_from_uri(uri) self._newnode3 = self.clients[3].create_node_from_uri(uri) log.msg("starting replace2") d1 = newnode1.overwrite(NEWERDATA) d1.addCallback(lambda res: newnode2.download_best_version()) return d1 d.addCallback(_check_download_4) def _check_download_5(res): log.msg("finished replace2") self.failUnlessEqual(res, NEWERDATA) d.addCallback(_check_download_5) def _corrupt_shares(res): # run around and flip bits in all but k of the shares, to test # the hash checks shares = self._find_all_shares(self.basedir) ## sort by share number #shares.sort( lambda a,b: cmp(a[3], b[3]) ) where = dict([ (shnum, filename) for (client_num, storage_index, filename, shnum) in shares ]) assert len(where) == 10 # this test is designed for 3-of-10 for shnum, filename in where.items(): # shares 7,8,9 are left alone. read will check # (share_hash_chain, block_hash_tree, share_data). New # seqnum+R pairs will trigger a check of (seqnum, R, IV, # segsize, signature). if shnum == 0: # read: this will trigger "pubkey doesn't match # fingerprint". self._corrupt_mutable_share(filename, "pubkey") self._corrupt_mutable_share(filename, "encprivkey") elif shnum == 1: # triggers "signature is invalid" self._corrupt_mutable_share(filename, "seqnum") elif shnum == 2: # triggers "signature is invalid" self._corrupt_mutable_share(filename, "R") elif shnum == 3: # triggers "signature is invalid" self._corrupt_mutable_share(filename, "segsize") elif shnum == 4: self._corrupt_mutable_share(filename, "share_hash_chain") elif shnum == 5: self._corrupt_mutable_share(filename, "block_hash_tree") elif shnum == 6: self._corrupt_mutable_share(filename, "share_data") # other things to correct: IV, signature # 7,8,9 are left alone # note that initial_query_count=5 means that we'll hit the # first 5 servers in effectively random order (based upon # response time), so we won't necessarily ever get a "pubkey # doesn't match fingerprint" error (if we hit shnum>=1 before # shnum=0, we pull the pubkey from there). To get repeatable # specific failures, we need to set initial_query_count=1, # but of course that will change the sequencing behavior of # the retrieval process. TODO: find a reasonable way to make # this a parameter, probably when we expand this test to test # for one failure mode at a time. # when we retrieve this, we should get three signature # failures (where we've mangled seqnum, R, and segsize). The # pubkey mangling d.addCallback(_corrupt_shares) d.addCallback(lambda res: self._newnode3.download_best_version()) d.addCallback(_check_download_5) def _check_empty_file(res): # make sure we can create empty files, this usually screws up the # segsize math d1 = self.clients[2].create_mutable_file("") d1.addCallback(lambda newnode: newnode.download_best_version()) d1.addCallback(lambda res: self.failUnlessEqual("", res)) return d1 d.addCallback(_check_empty_file) d.addCallback(lambda res: self.clients[0].create_dirnode()) def _created_dirnode(dnode): log.msg("_created_dirnode(%s)" % (dnode,)) d1 = dnode.list() d1.addCallback(lambda children: self.failUnlessEqual(children, {})) d1.addCallback(lambda res: dnode.has_child(u"edgar")) d1.addCallback(lambda answer: self.failUnlessEqual(answer, False)) d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode)) d1.addCallback(lambda res: dnode.has_child(u"see recursive")) d1.addCallback(lambda answer: self.failUnlessEqual(answer, True)) d1.addCallback(lambda res: dnode.build_manifest().when_done()) d1.addCallback(lambda res: self.failUnlessEqual(len(res["manifest"]), 1)) return d1 d.addCallback(_created_dirnode) def wait_for_c3_kg_conn(): return self.clients[3]._key_generator is not None d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn)) def check_kg_poolsize(junk, size_delta): self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool), self.key_generator_svc.key_generator.pool_size + size_delta) d.addCallback(check_kg_poolsize, 0) d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world')) d.addCallback(check_kg_poolsize, -1) d.addCallback(lambda junk: self.clients[3].create_dirnode()) d.addCallback(check_kg_poolsize, -2) # use_helper induces use of clients[3], which is the using-key_gen client d.addCallback(lambda junk: self.POST("uri?t=mkdir&name=george", use_helper=True)) d.addCallback(check_kg_poolsize, -3) return d def flip_bit(self, good): return good[:-1] + chr(ord(good[-1]) ^ 0x01) def mangle_uri(self, gooduri): # change the key, which changes the storage index, which means we'll # be asking about the wrong file, so nobody will have any shares u = uri.from_string(gooduri) u2 = uri.CHKFileURI(key=self.flip_bit(u.key), uri_extension_hash=u.uri_extension_hash, needed_shares=u.needed_shares, total_shares=u.total_shares, size=u.size) return u2.to_string() # TODO: add a test which mangles the uri_extension_hash instead, and # should fail due to not being able to get a valid uri_extension block. # Also a test which sneakily mangles the uri_extension block to change # some of the validation data, so it will fail in the post-download phase # when the file's crypttext integrity check fails. Do the same thing for # the key, which should cause the download to fail the post-download # plaintext_hash check. def test_filesystem(self): self.basedir = "system/SystemTest/test_filesystem" self.data = LARGE_DATA d = self.set_up_nodes(use_stats_gatherer=True) def _new_happy_semantics(ign): for c in self.clients: c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1 d.addCallback(_new_happy_semantics) d.addCallback(self._test_introweb) d.addCallback(self.log, "starting publish") d.addCallback(self._do_publish1) d.addCallback(self._test_runner) d.addCallback(self._do_publish2) # at this point, we have the following filesystem (where "R" denotes # self._root_directory_uri): # R # R/subdir1 # R/subdir1/mydata567 # R/subdir1/subdir2/ # R/subdir1/subdir2/mydata992 d.addCallback(lambda res: self.bounce_client(0)) d.addCallback(self.log, "bounced client0") d.addCallback(self._check_publish1) d.addCallback(self.log, "did _check_publish1") d.addCallback(self._check_publish2) d.addCallback(self.log, "did _check_publish2") d.addCallback(self._do_publish_private) d.addCallback(self.log, "did _do_publish_private") # now we also have (where "P" denotes a new dir): # P/personal/sekrit data # P/s2-rw -> /subdir1/subdir2/ # P/s2-ro -> /subdir1/subdir2/ (read-only) d.addCallback(self._check_publish_private) d.addCallback(self.log, "did _check_publish_private") d.addCallback(self._test_web) d.addCallback(self._test_control) d.addCallback(self._test_cli) # P now has four top-level children: # P/personal/sekrit data # P/s2-ro/ # P/s2-rw/ # P/test_put/ (empty) d.addCallback(self._test_checker) return d def _test_introweb(self, res): d = getPage(self.introweb_url, method="GET", followRedirect=True) def _check(res): try: self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__) in res) self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res) self.failUnless("Subscription Summary: storage: 5" in res) except unittest.FailTest: print print "GET %s output was:" % self.introweb_url print res raise d.addCallback(_check) d.addCallback(lambda res: getPage(self.introweb_url + "?t=json", method="GET", followRedirect=True)) def _check_json(res): data = simplejson.loads(res) try: self.failUnlessEqual(data["subscription_summary"], {"storage": 5}) self.failUnlessEqual(data["announcement_summary"], {"storage": 5, "stub_client": 5}) self.failUnlessEqual(data["announcement_distinct_hosts"], {"storage": 1, "stub_client": 1}) except unittest.FailTest: print print "GET %s?t=json output was:" % self.introweb_url print res raise d.addCallback(_check_json) return d def _do_publish1(self, res): ut = upload.Data(self.data, convergence=None) c0 = self.clients[0] d = c0.create_dirnode() def _made_root(new_dirnode): self._root_directory_uri = new_dirnode.get_uri() return c0.create_node_from_uri(self._root_directory_uri) d.addCallback(_made_root) d.addCallback(lambda root: root.create_subdirectory(u"subdir1")) def _made_subdir1(subdir1_node): self._subdir1_node = subdir1_node d1 = subdir1_node.add_file(u"mydata567", ut) d1.addCallback(self.log, "publish finished") def _stash_uri(filenode): self.uri = filenode.get_uri() assert isinstance(self.uri, str), (self.uri, filenode) d1.addCallback(_stash_uri) return d1 d.addCallback(_made_subdir1) return d def _do_publish2(self, res): ut = upload.Data(self.data, convergence=None) d = self._subdir1_node.create_subdirectory(u"subdir2") d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut)) return d def log(self, res, *args, **kwargs): # print "MSG: %s RES: %s" % (msg, args) log.msg(*args, **kwargs) return res def _do_publish_private(self, res): self.smalldata = "sssh, very secret stuff" ut = upload.Data(self.smalldata, convergence=None) d = self.clients[0].create_dirnode() d.addCallback(self.log, "GOT private directory") def _got_new_dir(privnode): rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri) d1 = privnode.create_subdirectory(u"personal") d1.addCallback(self.log, "made P/personal") d1.addCallback(lambda node: node.add_file(u"sekrit data", ut)) d1.addCallback(self.log, "made P/personal/sekrit data") d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"])) def _got_s2(s2node): d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(), s2node.get_readonly_uri()) d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri(), s2node.get_readonly_uri())) return d2 d1.addCallback(_got_s2) d1.addCallback(lambda res: privnode) return d1 d.addCallback(_got_new_dir) return d def _check_publish1(self, res): # this one uses the iterative API c1 = self.clients[1] d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri)) d.addCallback(self.log, "check_publish1 got /") d.addCallback(lambda root: root.get(u"subdir1")) d.addCallback(lambda subdir1: subdir1.get(u"mydata567")) d.addCallback(lambda filenode: download_to_data(filenode)) d.addCallback(self.log, "get finished") def _get_done(data): self.failUnlessEqual(data, self.data) d.addCallback(_get_done) return d def _check_publish2(self, res): # this one uses the path-based API rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri) d = rootnode.get_child_at_path(u"subdir1") d.addCallback(lambda dirnode: self.failUnless(IDirectoryNode.providedBy(dirnode))) d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567")) d.addCallback(lambda filenode: download_to_data(filenode)) d.addCallback(lambda data: self.failUnlessEqual(data, self.data)) d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567")) def _got_filenode(filenode): fnode = self.clients[1].create_node_from_uri(filenode.get_uri()) assert fnode == filenode d.addCallback(_got_filenode) return d def _check_publish_private(self, resnode): # this one uses the path-based API self._private_node = resnode d = self._private_node.get_child_at_path(u"personal") def _got_personal(personal): self._personal_node = personal return personal d.addCallback(_got_personal) d.addCallback(lambda dirnode: self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode)) def get_path(path): return self._private_node.get_child_at_path(path) d.addCallback(lambda res: get_path(u"personal/sekrit data")) d.addCallback(lambda filenode: download_to_data(filenode)) d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata)) d.addCallback(lambda res: get_path(u"s2-rw")) d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable())) d.addCallback(lambda res: get_path(u"s2-ro")) def _got_s2ro(dirnode): self.failUnless(dirnode.is_mutable(), dirnode) self.failUnless(dirnode.is_readonly(), dirnode) d1 = defer.succeed(None) d1.addCallback(lambda res: dirnode.list()) d1.addCallback(self.log, "dirnode.list") d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope")) d1.addCallback(self.log, "doing add_file(ro)") ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)") d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut)) d1.addCallback(self.log, "doing get(ro)") d1.addCallback(lambda res: dirnode.get(u"mydata992")) d1.addCallback(lambda filenode: self.failUnless(IFileNode.providedBy(filenode))) d1.addCallback(self.log, "doing delete(ro)") d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992")) d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri)) d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing")) personal = self._personal_node d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope")) d1.addCallback(self.log, "doing move_child_to(ro)2") d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope")) d1.addCallback(self.log, "finished with _got_s2ro") return d1 d.addCallback(_got_s2ro) def _got_home(dummy): home = self._private_node personal = self._personal_node d1 = defer.succeed(None) d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit") d1.addCallback(lambda res: personal.move_child_to(u"sekrit data",home,u"sekrit")) d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'") d1.addCallback(lambda res: home.move_child_to(u"sekrit", home, u"sekrit data")) d1.addCallback(self.log, "mv 'P/sekret data' P/personal/") d1.addCallback(lambda res: home.move_child_to(u"sekrit data", personal)) d1.addCallback(lambda res: home.build_manifest().when_done()) d1.addCallback(self.log, "manifest") # five items: # P/ # P/personal/ # P/personal/sekrit data # P/s2-rw (same as P/s2-ro) # P/s2-rw/mydata992 (same as P/s2-rw/mydata992) d1.addCallback(lambda res: self.failUnlessEqual(len(res["manifest"]), 5)) d1.addCallback(lambda res: home.start_deep_stats().when_done()) def _check_stats(stats): expected = {"count-immutable-files": 1, "count-mutable-files": 0, "count-literal-files": 1, "count-files": 2, "count-directories": 3, "size-immutable-files": 112, "size-literal-files": 23, #"size-directories": 616, # varies #"largest-directory": 616, "largest-directory-children": 3, "largest-immutable-file": 112, } for k,v in expected.iteritems(): self.failUnlessEqual(stats[k], v, "stats[%s] was %s, not %s" % (k, stats[k], v)) self.failUnless(stats["size-directories"] > 1300, stats["size-directories"]) self.failUnless(stats["largest-directory"] > 800, stats["largest-directory"]) self.failUnlessEqual(stats["size-files-histogram"], [ (11, 31, 1), (101, 316, 1) ]) d1.addCallback(_check_stats) return d1 d.addCallback(_got_home) return d def shouldFail(self, res, expected_failure, which, substring=None): if isinstance(res, Failure): res.trap(expected_failure) if substring: self.failUnless(substring in str(res), "substring '%s' not in '%s'" % (substring, str(res))) else: self.fail("%s was supposed to raise %s, not get '%s'" % (which, expected_failure, res)) def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs): assert substring is None or isinstance(substring, str) d = defer.maybeDeferred(callable, *args, **kwargs) def done(res): if isinstance(res, Failure): res.trap(expected_failure) if substring: self.failUnless(substring in str(res), "substring '%s' not in '%s'" % (substring, str(res))) else: self.fail("%s was supposed to raise %s, not get '%s'" % (which, expected_failure, res)) d.addBoth(done) return d def PUT(self, urlpath, data): url = self.webish_url + urlpath return getPage(url, method="PUT", postdata=data) def GET(self, urlpath, followRedirect=False): url = self.webish_url + urlpath return getPage(url, method="GET", followRedirect=followRedirect) def POST(self, urlpath, followRedirect=False, use_helper=False, **fields): sepbase = "boogabooga" sep = "--" + sepbase form = [] form.append(sep) form.append('Content-Disposition: form-data; name="_charset"') form.append('') form.append('UTF-8') form.append(sep) for name, value in fields.iteritems(): if isinstance(value, tuple): filename, value = value form.append('Content-Disposition: form-data; name="%s"; ' 'filename="%s"' % (name, filename.encode("utf-8"))) else: form.append('Content-Disposition: form-data; name="%s"' % name) form.append('') form.append(str(value)) form.append(sep) form[-1] += "--" body = "" headers = {} if fields: body = "\r\n".join(form) + "\r\n" headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase return self.POST2(urlpath, body, headers, followRedirect, use_helper) def POST2(self, urlpath, body="", headers={}, followRedirect=False, use_helper=False): if use_helper: url = self.helper_webish_url + urlpath else: url = self.webish_url + urlpath return getPage(url, method="POST", postdata=body, headers=headers, followRedirect=followRedirect) def _test_web(self, res): base = self.webish_url public = "uri/" + self._root_directory_uri d = getPage(base) def _got_welcome(page): # XXX This test is oversensitive to formatting expected = "Connected to %d\n of %d known storage servers:" % (self.numclients, self.numclients) self.failUnless(expected in page, "I didn't see the right 'connected storage servers'" " message in: %s" % page ) expected = "