offloaded: update unit tests: assert that interrupt/resume works, and that the helper deletes tempfiles

This commit is contained in:
Brian Warner 2008-01-17 01:18:10 -07:00
parent 6b08c28f5d
commit fd0dc3013c
2 changed files with 78 additions and 53 deletions

View File

@ -1,4 +1,5 @@
import os
from twisted.trial import unittest
from twisted.application import service
@ -6,7 +7,7 @@ from foolscap import Tub, eventual
from foolscap.logging import log
from allmydata import upload, offloaded
from allmydata.util import hashutil
from allmydata.util import hashutil, fileutil
MiB = 1024*1024
@ -61,10 +62,12 @@ class AssistedUpload(unittest.TestCase):
# bogus host/port
t.setLocation("bogus:1234")
self.helper = h = offloaded.Helper(".")
def setUpHelper(self, basedir):
fileutil.make_dirs(basedir)
self.helper = h = offloaded.Helper(basedir)
h.chk_upload_helper_class = CHKUploadHelper_fake
h.setServiceParent(self.s)
self.helper_furl = t.registerReference(h)
self.helper_furl = self.tub.registerReference(h)
def tearDown(self):
d = self.s.stopService()
@ -74,6 +77,8 @@ class AssistedUpload(unittest.TestCase):
def test_one(self):
self.basedir = "helper/AssistedUpload/test_one"
self.setUpHelper(self.basedir)
u = upload.Uploader(self.helper_furl)
u.setServiceParent(self.s)
@ -92,10 +97,19 @@ class AssistedUpload(unittest.TestCase):
assert "CHK" in uri
d.addCallback(_uploaded)
def _check_empty(res):
files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
self.failUnlessEqual(files, [])
files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
self.failUnlessEqual(files, [])
d.addCallback(_check_empty)
return d
def test_already_uploaded(self):
self.basedir = "helper/AssistedUpload/test_already_uploaded"
self.setUpHelper(self.basedir)
self.helper.chk_upload_helper_class = CHKUploadHelper_already_uploaded
u = upload.Uploader(self.helper_furl)
u.setServiceParent(self.s)
@ -115,4 +129,11 @@ class AssistedUpload(unittest.TestCase):
assert "CHK" in uri
d.addCallback(_uploaded)
def _check_empty(res):
files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
self.failUnlessEqual(files, [])
files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
self.failUnlessEqual(files, [])
d.addCallback(_check_empty)
return d

View File

@ -7,15 +7,15 @@ from twisted.internet import defer, reactor
from twisted.internet import threads # CLI tests use deferToThread
from twisted.internet.error import ConnectionDone
from twisted.application import service
from allmydata import client, uri, download, upload, storage, mutable
from allmydata import client, uri, download, upload, storage, mutable, offloaded
from allmydata.introducer import IntroducerNode
from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
from allmydata.util import log
from allmydata.scripts import runner
from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
from allmydata.mutable import NotMutableError
from foolscap.eventual import flushEventualQueue
from foolscap import DeadReferenceError
from twisted.python import log
from twisted.python.failure import Failure
from twisted.web.client import getPage
from twisted.web.error import Error
@ -32,13 +32,6 @@ This is some data to publish to the virtual drive, which needs to be large
enough to not fit inside a LIT uri.
"""
class SmallSegmentDataUploadable(upload.Data):
def __init__(self, max_segment_size, *args, **kwargs):
self._max_segment_size = max_segment_size
upload.Data.__init__(self, *args, **kwargs)
def get_maximum_segment_size(self):
return defer.succeed(self._max_segment_size)
class SystemTest(testutil.SignalMixin, unittest.TestCase):
def setUp(self):
@ -210,7 +203,9 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
# tail segment is not the same length as the others. This actualy
# gets rounded up to 1025 to be a multiple of the number of
# required shares (since we use 25 out of 100 FEC).
d1 = u.upload(SmallSegmentDataUploadable(1024, DATA))
up = upload.Data(DATA)
up.max_segment_size = 1024
d1 = u.upload(up)
return d1
d.addCallback(_do_upload)
def _upload_done(uri):
@ -226,7 +221,9 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
# the roothash), we have to do all of the encoding work, and only
# get to save on the upload part.
log.msg("UPLOADING AGAIN")
d1 = self.uploader.upload(SmallSegmentDataUploadable(1024, DATA))
up = upload.Data(DATA)
up.max_segment_size = 1024
d1 = self.uploader.upload(up)
d.addCallback(_upload_again)
def _download_to_data(res):
@ -299,38 +296,37 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
def _upload_resumable(res):
DATA = "Data that needs help to upload and gets interrupted" * 1000
u = upload.Data(DATA)
# interrupt the first upload after 5kB
print "GOING"
from allmydata.util import log
options = {"debug_interrupt": 5000,
"debug_stash_RemoteEncryptedUploadable": True,
}
# sneak into the helper and reduce its segment size, so that our
# debug_interrupt will sever the connection on about the fifth
# segment fetched. This makes sure that we've started to write
# the new shares before we abandon them, which exercises the
# abort/delete-partial-share code.
o2 = {"max_segment_size": 1000}
self.clients[0].getServiceNamed("helper")._chk_options = o2
u1 = upload.Data(DATA)
u2 = upload.Data(DATA)
d = self.extra_node.upload(u, options)
# tell the upload to drop the connection after about 5kB
u1.debug_interrupt = 5000
u1.debug_stash_RemoteEncryptedUploadable = True
u2.debug_stash_RemoteEncryptedUploadable = True
# sneak into the helper and reduce its chunk size, so that our
# debug_interrupt will sever the connection on about the fifth
# chunk fetched. This makes sure that we've started to write the
# new shares before we abandon them, which exercises the
# abort/delete-partial-share code. TODO: find a cleaner way to do
# this. I know that this will affect later uses of the helper in
# this same test run, but I'm not currently worried about it.
offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
d = self.extra_node.upload(u1)
def _should_not_finish(res):
self.fail("interrupted upload should have failed, not finished"
" with result %s" % (res,))
def _interrupted(f):
print "interrupted"
log.msg("interrupted", level=log.WEIRD, failure=f)
f.trap(ConnectionDone, DeadReferenceError)
reu = options["RemoteEncryptedUploadable"]
print "REU.bytes", reu._bytes_read
reu = u1.debug_RemoteEncryptedUploadable
# make sure we actually interrupted it before finishing the
# file
self.failUnless(reu._bytes_read < len(DATA),
"read %d out of %d total" % (reu._bytes_read,
self.failUnless(reu._bytes_sent < len(DATA),
"read %d out of %d total" % (reu._bytes_sent,
len(DATA)))
log.msg("waiting for reconnect", level=log.WEIRD)
log.msg("waiting for reconnect", level=log.NOISY,
facility="tahoe.test.test_system")
# now, we need to give the nodes a chance to notice that this
# connection has gone away. When this happens, the storage
# servers will be told to abort their uploads, removing the
@ -347,8 +343,8 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
# check to make sure the storage servers aren't still hanging
# on to the partial share: their incoming/ directories should
# now be empty.
print "disconnected"
log.msg("disconnected", level=log.WEIRD)
log.msg("disconnected", level=log.NOISY,
facility="tahoe.test.test_system")
for i in range(self.numclients):
incdir = os.path.join(self.getdir("client%d" % i),
"storage", "shares", "incoming")
@ -358,35 +354,43 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
def _wait_for_reconnect(res):
# then we need to give the reconnector a chance to
# reestablish the connection to the helper.
d.addCallback(lambda res: log.msg("wait_for_connections",
level=log.WEIRD))
d.addCallback(lambda res:
log.msg("wait_for_connections", level=log.NOISY,
facility="tahoe.test.test_system"))
d.addCallback(lambda res: self.wait_for_connections())
d.addCallback(_wait_for_reconnect)
options2 = {"debug_stash_RemoteEncryptedUploadable": True}
def _upload_again(res):
print "uploading again"
log.msg("uploading again", level=log.WEIRD)
return self.extra_node.upload(u, options2)
log.msg("uploading again", level=log.NOISY,
facility="tahoe.test.test_system")
return self.extra_node.upload(u2)
d.addCallbacks(_upload_again)
def _uploaded(uri):
log.msg("I think its uploaded", level=log.WEIRD)
print "I tunk its uploaded", uri
reu = options2["RemoteEncryptedUploadable"]
print "REU.bytes", reu._bytes_read
log.msg("Second upload complete", level=log.NOISY,
facility="tahoe.test.test_system")
reu = u2.debug_RemoteEncryptedUploadable
# make sure we didn't read the whole file the second time
# around
#self.failUnless(reu._bytes_read < len(DATA),
# "resumption didn't save us any work:"
# " read %d bytes out of %d total" %
# (reu._bytes_read, len(DATA)))
self.failUnless(reu._bytes_sent < len(DATA),
"resumption didn't save us any work:"
" read %d bytes out of %d total" %
(reu._bytes_sent, len(DATA)))
return self.downloader.download_to_data(uri)
d.addCallback(_uploaded)
def _check(newdata):
self.failUnlessEqual(newdata, DATA)
# also check that the helper has removed the temp file from
# its directories
basedir = os.path.join(self.getdir("client0"), "helper")
files = os.listdir(os.path.join(basedir, "CHK_encoding"))
self.failUnlessEqual(files, [])
files = os.listdir(os.path.join(basedir, "CHK_incoming"))
self.failUnlessEqual(files, [])
d.addCallback(_check)
return d
#d.addCallback(_upload_resumable)
d.addCallback(_upload_resumable)
return d
test_upload_and_download.timeout = 4800