upload: add Encoder.abort(), to abandon the upload in progress. Add some debug hooks to enable unit tests.

This commit is contained in:
Brian Warner 2008-01-14 21:22:55 -07:00
parent 60090fb9f2
commit a6ca98ac53
6 changed files with 116 additions and 15 deletions

View File

@ -90,6 +90,7 @@ class Encoder(object):
self._parent = parent self._parent = parent
if self._parent: if self._parent:
self._log_number = self._parent.log("creating Encoder %s" % self) self._log_number = self._parent.log("creating Encoder %s" % self)
self._aborted = False
def __repr__(self): def __repr__(self):
if hasattr(self, "_storage_index"): if hasattr(self, "_storage_index"):
@ -263,6 +264,15 @@ class Encoder(object):
d.addCallbacks(lambda res: self.done(), self.err) d.addCallbacks(lambda res: self.done(), self.err)
return d return d
def abort(self):
self.log("aborting upload")
assert self._codec, "don't call abort before start"
self._aborted = True
# the next segment read (in _gather_data inside _encode_segment) will
# raise UploadAborted(), which will bypass the rest of the upload
# chain. If we've sent the final segment's shares, it's too late to
# abort. TODO: allow abort any time up to close_all_shareholders.
def _turn_barrier(self, res): def _turn_barrier(self, res):
# putting this method in a Deferred chain imposes a guaranteed # putting this method in a Deferred chain imposes a guaranteed
# reactor turn between the pre- and post- portions of that chain. # reactor turn between the pre- and post- portions of that chain.
@ -341,11 +351,16 @@ class Encoder(object):
with the combination of any 'previous_chunks' and the new chunks with the combination of any 'previous_chunks' and the new chunks
which were gathered.""" which were gathered."""
if self._aborted:
raise UploadAborted()
if not num_chunks: if not num_chunks:
return defer.succeed(previous_chunks) return defer.succeed(previous_chunks)
d = self._uploadable.read_encrypted(input_chunk_size) d = self._uploadable.read_encrypted(input_chunk_size)
def _got(data): def _got(data):
if self._aborted:
raise UploadAborted()
encrypted_pieces = [] encrypted_pieces = []
length = 0 length = 0
while data: while data:
@ -595,6 +610,19 @@ class Encoder(object):
def err(self, f): def err(self, f):
self.log("UNUSUAL: %s: upload failed: %s" % (self, f)) self.log("UNUSUAL: %s: upload failed: %s" % (self, f))
# we need to abort any remaining shareholders, so they'll delete the
# partial share, allowing someone else to upload it again.
self.log("aborting shareholders")
dl = []
for shareid in list(self.landlords.keys()):
d = self.landlords[shareid].abort()
d.addErrback(self._remove_shareholder, shareid, "abort")
dl.append(d)
d = self._gather_responses(dl)
def _done(res):
self.log("shareholders aborted")
if f.check(defer.FirstError): if f.check(defer.FirstError):
return f.value.subFailure return f.value.subFailure
return f return f
d.addCallback(_done)
return d

View File

@ -78,6 +78,11 @@ class RIBucketWriter(RemoteInterface):
""" """
return None return None
def abort():
"""Abandon all the data that has been written.
"""
return None
class RIBucketReader(RemoteInterface): class RIBucketReader(RemoteInterface):
def read(offset=int, length=int): def read(offset=int, length=int):
return ShareData return ShareData

View File

@ -164,11 +164,13 @@ class ShareFile:
class BucketWriter(Referenceable): class BucketWriter(Referenceable):
implements(RIBucketWriter) implements(RIBucketWriter)
def __init__(self, ss, incominghome, finalhome, size, lease_info): def __init__(self, ss, incominghome, finalhome, size, lease_info, canary):
self.ss = ss self.ss = ss
self.incominghome = incominghome self.incominghome = incominghome
self.finalhome = finalhome self.finalhome = finalhome
self._size = size self._size = size
self._canary = canary
self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
self.closed = False self.closed = False
self.throw_out_all_data = False self.throw_out_all_data = False
# touch the file, so later callers will see that we're working on it. # touch the file, so later callers will see that we're working on it.
@ -196,6 +198,7 @@ class BucketWriter(Referenceable):
fileutil.rename(self.incominghome, self.finalhome) fileutil.rename(self.incominghome, self.finalhome)
self._sharefile = None self._sharefile = None
self.closed = True self.closed = True
self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
filelen = os.stat(self.finalhome)[stat.ST_SIZE] filelen = os.stat(self.finalhome)[stat.ST_SIZE]
self.ss.bucket_writer_closed(self, filelen) self.ss.bucket_writer_closed(self, filelen)
@ -206,6 +209,28 @@ class BucketWriter(Referenceable):
if not os.listdir(parentdir): if not os.listdir(parentdir):
os.rmdir(parentdir) os.rmdir(parentdir)
def _disconnected(self):
if not self.closed:
self._abort()
def remote_abort(self):
log.msg("storage: aborting sharefile %s" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL)
if not self.closed:
self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
self._abort()
def _abort(self):
if self.closed:
return
os.remove(self.incominghome)
# if we were the last share to be moved, remove the incoming/
# directory that was our parent
parentdir = os.path.split(self.incominghome)[0]
if not os.listdir(parentdir):
os.rmdir(parentdir)
class BucketReader(Referenceable): class BucketReader(Referenceable):
implements(RIBucketReader) implements(RIBucketReader)
@ -721,7 +746,7 @@ class StorageServer(service.MultiService, Referenceable):
# ok! we need to create the new share file. # ok! we need to create the new share file.
fileutil.make_dirs(os.path.join(self.incomingdir, si_s)) fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
bw = BucketWriter(self, incominghome, finalhome, bw = BucketWriter(self, incominghome, finalhome,
space_per_bucket, lease_info) space_per_bucket, lease_info, canary)
if self.no_storage: if self.no_storage:
bw.throw_out_all_data = True bw.throw_out_all_data = True
bucketwriters[shnum] = bw bucketwriters[shnum] = bw
@ -1110,6 +1135,9 @@ class WriteBucketProxy:
def close(self): def close(self):
return self._rref.callRemote("close") return self._rref.callRemote("close")
def abort(self):
return self._rref.callRemote("abort")
class ReadBucketProxy: class ReadBucketProxy:
implements(IStorageBucketReader) implements(IStorageBucketReader)
def __init__(self, rref): def __init__(self, rref):

View File

@ -84,6 +84,9 @@ class FakeBucketWriterProxy:
self.closed = True self.closed = True
return defer.maybeDeferred(_try) return defer.maybeDeferred(_try)
def abort(self):
return defer.succeed(None)
def get_block(self, blocknum): def get_block(self, blocknum):
def _try(): def _try():
assert isinstance(blocknum, (int, long)) assert isinstance(blocknum, (int, long))
@ -621,7 +624,7 @@ class Roundtrip(unittest.TestCase):
d = self.send_and_recover((4,8,10), bucket_modes=modemap) d = self.send_and_recover((4,8,10), bucket_modes=modemap)
def _done(res): def _done(res):
self.failUnless(isinstance(res, Failure)) self.failUnless(isinstance(res, Failure))
self.failUnless(res.check(encode.NotEnoughPeersError)) self.failUnless(res.check(encode.NotEnoughPeersError), res)
d.addBoth(_done) d.addBoth(_done)
return d return d

View File

@ -12,6 +12,12 @@ from allmydata.storage import BucketWriter, BucketReader, \
from allmydata.interfaces import BadWriteEnablerError from allmydata.interfaces import BadWriteEnablerError
from allmydata.test.common import LoggingServiceParent from allmydata.test.common import LoggingServiceParent
class FakeCanary:
def notifyOnDisconnect(self, *args, **kwargs):
pass
def dontNotifyOnDisconnect(self, marker):
pass
class Bucket(unittest.TestCase): class Bucket(unittest.TestCase):
def make_workdir(self, name): def make_workdir(self, name):
basedir = os.path.join("storage", "Bucket", name) basedir = os.path.join("storage", "Bucket", name)
@ -33,7 +39,8 @@ class Bucket(unittest.TestCase):
def test_create(self): def test_create(self):
incoming, final = self.make_workdir("test_create") incoming, final = self.make_workdir("test_create")
bw = BucketWriter(self, incoming, final, 200, self.make_lease()) bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
FakeCanary())
bw.remote_write(0, "a"*25) bw.remote_write(0, "a"*25)
bw.remote_write(25, "b"*25) bw.remote_write(25, "b"*25)
bw.remote_write(50, "c"*25) bw.remote_write(50, "c"*25)
@ -42,7 +49,8 @@ class Bucket(unittest.TestCase):
def test_readwrite(self): def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite") incoming, final = self.make_workdir("test_readwrite")
bw = BucketWriter(self, incoming, final, 200, self.make_lease()) bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
FakeCanary())
bw.remote_write(0, "a"*25) bw.remote_write(0, "a"*25)
bw.remote_write(25, "b"*25) bw.remote_write(25, "b"*25)
bw.remote_write(50, "c"*7) # last block may be short bw.remote_write(50, "c"*7) # last block may be short
@ -69,7 +77,8 @@ class BucketProxy(unittest.TestCase):
final = os.path.join(basedir, "bucket") final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir) fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp")) fileutil.make_dirs(os.path.join(basedir, "tmp"))
bw = BucketWriter(self, incoming, final, size, self.make_lease()) bw = BucketWriter(self, incoming, final, size, self.make_lease(),
FakeCanary())
rb = RemoteBucket() rb = RemoteBucket()
rb.target = bw rb.target = bw
return bw, rb, final return bw, rb, final
@ -201,7 +210,7 @@ class Server(unittest.TestCase):
cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()) cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
return ss.remote_allocate_buckets(storage_index, return ss.remote_allocate_buckets(storage_index,
renew_secret, cancel_secret, renew_secret, cancel_secret,
sharenums, size, Referenceable()) sharenums, size, FakeCanary())
def test_remove_incoming(self): def test_remove_incoming(self):
ss = self.create("test_remove_incoming") ss = self.create("test_remove_incoming")
@ -219,7 +228,7 @@ class Server(unittest.TestCase):
self.failUnlessEqual(ss.remote_get_buckets("vid"), {}) self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
canary = Referenceable() canary = FakeCanary()
already,writers = self.allocate(ss, "vid", [0,1,2], 75) already,writers = self.allocate(ss, "vid", [0,1,2], 75)
self.failUnlessEqual(already, set()) self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2])) self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
@ -253,7 +262,7 @@ class Server(unittest.TestCase):
def test_sizelimits(self): def test_sizelimits(self):
ss = self.create("test_sizelimits", 5000) ss = self.create("test_sizelimits", 5000)
canary = Referenceable() canary = FakeCanary()
# a newly created and filled share incurs this much overhead, beyond # a newly created and filled share incurs this much overhead, beyond
# the size we request. # the size we request.
OVERHEAD = 3*4 OVERHEAD = 3*4
@ -336,7 +345,7 @@ class Server(unittest.TestCase):
def test_leases(self): def test_leases(self):
ss = self.create("test_leases") ss = self.create("test_leases")
canary = Referenceable() canary = FakeCanary()
sharenums = range(5) sharenums = range(5)
size = 100 size = 100

View File

@ -436,6 +436,7 @@ class CHKUploader:
self._client = client self._client = client
self._options = options self._options = options
self._log_number = self._client.log("CHKUploader starting") self._log_number = self._client.log("CHKUploader starting")
self._encoder = None
def set_params(self, encoding_parameters): def set_params(self, encoding_parameters):
self._encoding_parameters = encoding_parameters self._encoding_parameters = encoding_parameters
@ -465,10 +466,19 @@ class CHKUploader:
d.addCallback(_uploaded) d.addCallback(_uploaded)
return d return d
def abort(self):
"""Call this is the upload must be abandoned before it completes.
This will tell the shareholders to delete their partial shares. I
return a Deferred that fires when these messages have been acked."""
if not self._encoder:
# how did you call abort() before calling start() ?
return defer.succeed(None)
return self._encoder.abort()
def start_encrypted(self, encrypted): def start_encrypted(self, encrypted):
eu = IEncryptedUploadable(encrypted) eu = IEncryptedUploadable(encrypted)
e = encode.Encoder(self._options, self) self._encoder = e = encode.Encoder(self._options, self)
e.set_params(self._encoding_parameters) e.set_params(self._encoding_parameters)
d = e.set_encrypted_uploadable(eu) d = e.set_encrypted_uploadable(eu)
d.addCallback(self.locate_all_shareholders) d.addCallback(self.locate_all_shareholders)
@ -562,6 +572,9 @@ class RemoteEncryptedUploabable(Referenceable):
def __init__(self, encrypted_uploadable): def __init__(self, encrypted_uploadable):
self._eu = IEncryptedUploadable(encrypted_uploadable) self._eu = IEncryptedUploadable(encrypted_uploadable)
self._offset = 0 self._offset = 0
self._bytes_read = 0
self._cutoff = None # set by debug options
self._cutoff_cb = None
def remote_get_size(self): def remote_get_size(self):
return self._eu.get_size() return self._eu.get_size()
@ -570,9 +583,13 @@ class RemoteEncryptedUploabable(Referenceable):
def remote_read_encrypted(self, offset, length): def remote_read_encrypted(self, offset, length):
# we don't yet implement seek # we don't yet implement seek
assert offset == self._offset, "%d != %d" % (offset, self._offset) assert offset == self._offset, "%d != %d" % (offset, self._offset)
if self._cutoff is not None and offset+length > self._cutoff:
self._cutoff_cb()
d = self._eu.read_encrypted(length) d = self._eu.read_encrypted(length)
def _read(strings): def _read(strings):
self._offset += sum([len(data) for data in strings]) size = sum([len(data) for data in strings])
self._bytes_read += size
self._offset += size
return strings return strings
d.addCallback(_read) d.addCallback(_read)
return d return d
@ -636,6 +653,17 @@ class AssistedUploader:
self.log("helper says we need to upload") self.log("helper says we need to upload")
# we need to upload the file # we need to upload the file
reu = RemoteEncryptedUploabable(self._encuploadable) reu = RemoteEncryptedUploabable(self._encuploadable)
if "debug_stash_RemoteEncryptedUploadable" in self._options:
self._options["RemoteEncryptedUploabable"] = reu
if "debug_interrupt" in self._options:
reu._cutoff = self._options["debug_interrupt"]
def _cutoff():
# simulate the loss of the connection to the helper
self.log("debug_interrupt killing connection to helper",
level=log.WEIRD)
upload_helper.tracker.broker.transport.loseConnection()
return
reu._cutoff_cb = _cutoff
d = upload_helper.callRemote("upload", reu) d = upload_helper.callRemote("upload", reu)
# this Deferred will fire with the upload results # this Deferred will fire with the upload results
return d return d