mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-01 00:45:52 +00:00
offloaded: improve logging, pass through options, get ready for testing interrupted uploads. test_system: add (disabled) interrupted-upload test
This commit is contained in:
parent
a6ca98ac53
commit
168a8c3b73
@ -4,6 +4,7 @@ from twisted.application import service
|
||||
from twisted.internet import defer
|
||||
from foolscap import Referenceable
|
||||
from allmydata import upload, interfaces
|
||||
from allmydata.util import idlib
|
||||
|
||||
|
||||
|
||||
@ -14,17 +15,26 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||
"""
|
||||
implements(interfaces.RICHKUploadHelper)
|
||||
|
||||
def __init__(self, storage_index, helper):
|
||||
def __init__(self, storage_index, helper, log_number, options={}):
|
||||
self._finished = False
|
||||
self._storage_index = storage_index
|
||||
self._helper = helper
|
||||
self._log_number = self._helper.log("CHKUploadHelper starting")
|
||||
upload_id = idlib.b2a(storage_index)[:6]
|
||||
self._log_number = log_number
|
||||
self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
|
||||
parent=log_number)
|
||||
|
||||
self._client = helper.parent
|
||||
self._options = {}
|
||||
self._options = options
|
||||
self._readers = []
|
||||
|
||||
self.set_params( (3,7,10) ) # GACK
|
||||
|
||||
def log(self, *args, **kwargs):
|
||||
if 'facility' not in kwargs:
|
||||
kwargs['facility'] = "tahoe.helper"
|
||||
return upload.CHKUploader.log(self, *args, **kwargs)
|
||||
|
||||
def start(self):
|
||||
# determine if we need to upload the file. If so, return ({},self) .
|
||||
# If not, return (UploadResults,None) .
|
||||
@ -35,6 +45,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||
# reader is an RIEncryptedUploadable. I am specified to return an
|
||||
# UploadResults dictionary.
|
||||
|
||||
self._readers.append(reader)
|
||||
reader.notifyOnDisconnect(self._remove_reader, reader)
|
||||
eu = CiphertextReader(reader, self._storage_index)
|
||||
d = self.start_encrypted(eu)
|
||||
def _done(res):
|
||||
@ -44,6 +56,13 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
def _remove_reader(self, reader):
|
||||
# NEEDS MORE
|
||||
self._readers.remove(reader)
|
||||
if not self._readers:
|
||||
if not self._finished:
|
||||
self.finished(None)
|
||||
|
||||
def finished(self, res):
|
||||
self._finished = True
|
||||
self._helper.upload_finished(self._storage_index)
|
||||
@ -89,24 +108,31 @@ class Helper(Referenceable, service.MultiService):
|
||||
# and send the request off to them. If nobody has it, we'll choose a
|
||||
# helper at random.
|
||||
|
||||
name = "helper"
|
||||
chk_upload_helper_class = CHKUploadHelper
|
||||
|
||||
def __init__(self, basedir):
|
||||
self._basedir = basedir
|
||||
self._chk_options = {}
|
||||
self._active_uploads = {}
|
||||
service.MultiService.__init__(self)
|
||||
|
||||
def log(self, msg, **kwargs):
|
||||
def log(self, *args, **kwargs):
|
||||
if 'facility' not in kwargs:
|
||||
kwargs['facility'] = "helper"
|
||||
return self.parent.log(msg, **kwargs)
|
||||
kwargs['facility'] = "tahoe.helper"
|
||||
return self.parent.log(*args, **kwargs)
|
||||
|
||||
def remote_upload_chk(self, storage_index):
|
||||
lp = self.log(format="helper: upload_chk query for SI %(si)s",
|
||||
si=idlib.b2a(storage_index))
|
||||
# TODO: look on disk
|
||||
if storage_index in self._active_uploads:
|
||||
self.log("upload is currently active", parent=lp)
|
||||
uh = self._active_uploads[storage_index]
|
||||
else:
|
||||
uh = self.chk_upload_helper_class(storage_index, self)
|
||||
self.log("creating new upload helper", parent=lp)
|
||||
uh = self.chk_upload_helper_class(storage_index, self, lp,
|
||||
self._chk_options)
|
||||
self._active_uploads[storage_index] = uh
|
||||
return uh.start()
|
||||
|
||||
|
@ -5,6 +5,7 @@ from cStringIO import StringIO
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.internet import threads # CLI tests use deferToThread
|
||||
from twisted.internet.error import ConnectionDone
|
||||
from twisted.application import service
|
||||
from allmydata import client, uri, download, upload, storage, mutable
|
||||
from allmydata.introducer import IntroducerNode
|
||||
@ -12,7 +13,8 @@ from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
|
||||
from allmydata.scripts import runner
|
||||
from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
|
||||
from allmydata.mutable import NotMutableError
|
||||
from foolscap.eventual import flushEventualQueue
|
||||
from foolscap.eventual import fireEventually, flushEventualQueue
|
||||
from foolscap import DeadReferenceError
|
||||
from twisted.python import log
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.web.client import getPage
|
||||
@ -87,6 +89,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
|
||||
f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
|
||||
helper_furl = f.read()
|
||||
f.close()
|
||||
self.helper_furl = helper_furl
|
||||
f = open(os.path.join(basedirs[3],"helper.furl"), "w")
|
||||
f.write(helper_furl)
|
||||
f.close()
|
||||
@ -107,18 +110,27 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
|
||||
d.addCallback(_connected)
|
||||
return d
|
||||
|
||||
def add_extra_node(self, client_num):
|
||||
# this node is *not* parented to our self.sparent, so we can shut it
|
||||
# down separately from the rest, to exercise the connection-lost code
|
||||
def add_extra_node(self, client_num, helper_furl=None,
|
||||
add_to_sparent=False):
|
||||
# usually this node is *not* parented to our self.sparent, so we can
|
||||
# shut it down separately from the rest, to exercise the
|
||||
# connection-lost code
|
||||
basedir = self.getdir("client%d" % client_num)
|
||||
if not os.path.isdir(basedir):
|
||||
fileutil.make_dirs(basedir)
|
||||
open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
|
||||
if helper_furl:
|
||||
f = open(os.path.join(basedir, "helper.furl") ,"w")
|
||||
f.write(helper_furl+"\n")
|
||||
f.close()
|
||||
|
||||
c = client.Client(basedir=basedir)
|
||||
self.clients.append(c)
|
||||
self.numclients += 1
|
||||
c.startService()
|
||||
if add_to_sparent:
|
||||
c.setServiceParent(self.sparent)
|
||||
else:
|
||||
c.startService()
|
||||
d = self.wait_for_connections()
|
||||
d.addCallback(lambda res: c)
|
||||
return d
|
||||
@ -257,10 +269,20 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
|
||||
return d1
|
||||
d.addCallback(_download_nonexistent_uri)
|
||||
|
||||
# add a new node, which doesn't accept shares, and only uses the
|
||||
# helper for upload.
|
||||
d.addCallback(lambda res: self.add_extra_node(self.numclients,
|
||||
self.helper_furl,
|
||||
add_to_sparent=True))
|
||||
def _added(extra_node):
|
||||
self.extra_node = extra_node
|
||||
extra_node.getServiceNamed("storageserver").sizelimit = 0
|
||||
d.addCallback(_added)
|
||||
|
||||
def _upload_with_helper(res):
|
||||
DATA = "Data that needs help to upload" * 1000
|
||||
u = upload.Data(DATA)
|
||||
d = self.clients[3].upload(u)
|
||||
d = self.extra_node.upload(u)
|
||||
def _uploaded(uri):
|
||||
return self.downloader.download_to_data(uri)
|
||||
d.addCallback(_uploaded)
|
||||
@ -270,6 +292,104 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
|
||||
return d
|
||||
d.addCallback(_upload_with_helper)
|
||||
|
||||
def _upload_resumable(res):
|
||||
DATA = "Data that needs help to upload and gets interrupted" * 1000
|
||||
u = upload.Data(DATA)
|
||||
# interrupt the first upload after 5kB
|
||||
print "GOING"
|
||||
from allmydata.util import log
|
||||
options = {"debug_interrupt": 5000,
|
||||
"debug_stash_RemoteEncryptedUploadable": True,
|
||||
}
|
||||
# sneak into the helper and reduce its segment size, so that our
|
||||
# debug_interrupt will sever the connection on about the fifth
|
||||
# segment fetched. This makes sure that we've started to write
|
||||
# the new shares before we abandon them, which exercises the
|
||||
# abort/delete-partial-share code.
|
||||
o2 = {"max_segment_size": 1000}
|
||||
self.clients[0].getServiceNamed("helper")._chk_options = o2
|
||||
|
||||
d = self.extra_node.upload(u, options)
|
||||
def _eee(res):
|
||||
log.msg("EEE: %s" % (res,))
|
||||
print "EEE", res
|
||||
d2 = defer.Deferred()
|
||||
reactor.callLater(3, d2.callback, None)
|
||||
return d2
|
||||
#d.addBoth(_eee)
|
||||
#return d
|
||||
|
||||
def _should_not_finish(res):
|
||||
self.fail("interrupted upload should have failed, not finished"
|
||||
" with result %s" % (res,))
|
||||
def _interrupted(f):
|
||||
print "interrupted"
|
||||
log.msg("interrupted", level=log.WEIRD, failure=f)
|
||||
f.trap(ConnectionDone, DeadReferenceError)
|
||||
reu = options["RemoteEncryptedUploabable"]
|
||||
print "REU.bytes", reu._bytes_read
|
||||
# make sure we actually interrupted it before finishing the
|
||||
# file
|
||||
self.failUnless(reu._bytes_read < len(DATA),
|
||||
"read %d out of %d total" % (reu._bytes_read,
|
||||
len(DATA)))
|
||||
log.msg("waiting for reconnect", level=log.WEIRD)
|
||||
# now, we need to give the nodes a chance to notice that this
|
||||
# connection has gone away. When this happens, the storage
|
||||
# servers will be told to abort their uploads, removing the
|
||||
# partial shares. Unfortunately this involves TCP messages
|
||||
# going through the loopback interface, and we can't easily
|
||||
# predict how long that will take. If it were all local, we
|
||||
# could use fireEventually() to stall. Since we don't have
|
||||
# the right introduction hooks, the best we can do is use a
|
||||
# fixed delay. TODO: this is fragile.
|
||||
return self.stall(None, 2.0)
|
||||
d.addCallbacks(_should_not_finish, _interrupted)
|
||||
|
||||
def _disconnected(res):
|
||||
# check to make sure the storage servers aren't still hanging
|
||||
# on to the partial share: their incoming/ directories should
|
||||
# now be empty.
|
||||
print "disconnected"
|
||||
log.msg("disconnected", level=log.WEIRD)
|
||||
for i in range(self.numclients):
|
||||
incdir = os.path.join(self.getdir("client%d" % i),
|
||||
"storage", "shares", "incoming")
|
||||
self.failUnlessEqual(os.listdir(incdir), [])
|
||||
d.addCallback(_disconnected)
|
||||
|
||||
def _wait_for_reconnect(res):
|
||||
# then we need to give the reconnector a chance to
|
||||
# reestablish the connection to the helper.
|
||||
d.addCallback(lambda res: log.msg("wait_for_connections",
|
||||
level=log.WEIRD))
|
||||
d.addCallback(lambda res: self.wait_for_connections())
|
||||
d.addCallback(_wait_for_reconnect)
|
||||
options2 = {"debug_stash_RemoteEncryptedUploadable": True}
|
||||
def _upload_again(res):
|
||||
print "uploading again"
|
||||
log.msg("uploading again", level=log.WEIRD)
|
||||
return self.extra_node.upload(u, options2)
|
||||
d.addCallbacks(_upload_again)
|
||||
|
||||
def _uploaded(uri):
|
||||
log.msg("I think its uploaded", level=log.WEIRD)
|
||||
print "I tunk its uploaded", uri
|
||||
reu = options2["RemoteEncryptedUploabable"]
|
||||
print "REU.bytes", reu._bytes_read
|
||||
# make sure we didn't read the whole file the second time
|
||||
# around
|
||||
self.failUnless(reu._bytes_read < len(DATA),
|
||||
"resumption didn't save us any work: read %d bytes out of %d total" %
|
||||
(reu._bytes_read, len(DATA)))
|
||||
return self.downloader.download_to_data(uri)
|
||||
d.addCallback(_uploaded)
|
||||
def _check(newdata):
|
||||
self.failUnlessEqual(newdata, DATA)
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
#d.addCallback(_upload_resumable)
|
||||
|
||||
return d
|
||||
test_upload_and_download.timeout = 4800
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user