diff --git a/newsfragments/3939.bugfix b/newsfragments/3939.bugfix new file mode 100644 index 000000000..9d2071d32 --- /dev/null +++ b/newsfragments/3939.bugfix @@ -0,0 +1 @@ +Uploading immutables will now better use available bandwidth, which should allow for faster uploads in many cases. \ No newline at end of file diff --git a/src/allmydata/immutable/encode.py b/src/allmydata/immutable/encode.py index 874492785..2414527ff 100644 --- a/src/allmydata/immutable/encode.py +++ b/src/allmydata/immutable/encode.py @@ -262,6 +262,8 @@ class Encoder(object): d.addCallback(lambda res: self.finish_hashing()) + # These calls have to happen in order; layout.py now requires writes to + # be appended to the data written so far. d.addCallback(lambda res: self.send_crypttext_hash_tree_to_all_shareholders()) d.addCallback(lambda res: self.send_all_block_hash_trees()) diff --git a/src/allmydata/immutable/layout.py b/src/allmydata/immutable/layout.py index d552d43c4..9154f2f30 100644 --- a/src/allmydata/immutable/layout.py +++ b/src/allmydata/immutable/layout.py @@ -1,21 +1,18 @@ """ Ported to Python 3. """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals -from future.utils import PY2 -if PY2: - from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 +from __future__ import annotations import struct +from io import BytesIO + +from attrs import define, field from zope.interface import implementer from twisted.internet import defer from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \ FileTooLargeError, HASH_SIZE -from allmydata.util import mathutil, observer, pipeline, log +from allmydata.util import mathutil, observer, log from allmydata.util.assertutil import precondition from allmydata.storage.server import si_b2a @@ -107,19 +104,58 @@ def make_write_bucket_proxy(rref, server, num_share_hashes, uri_extension_size) return wbp + +@define +class _WriteBuffer: + """ + Queue up small writes to be written in a single batched larger write. + """ + _batch_size: int + _to_write : BytesIO = field(factory=BytesIO) + _written_bytes : int = field(default=0) + + def queue_write(self, data: bytes) -> bool: + """ + Queue a write. If the result is ``False``, no further action is needed + for now. If the result is some ``True``, it's time to call ``flush()`` + and do a real write. + """ + self._to_write.write(data) + return self.get_queued_bytes() >= self._batch_size + + def flush(self) -> tuple[int, bytes]: + """Return offset and data to be written.""" + offset = self._written_bytes + data = self._to_write.getvalue() + self._written_bytes += len(data) + self._to_write = BytesIO() + return (offset, data) + + def get_queued_bytes(self) -> int: + """Return number of queued, unwritten bytes.""" + return self._to_write.tell() + + def get_total_bytes(self) -> int: + """Return how many bytes were written or queued in total.""" + return self._written_bytes + self.get_queued_bytes() + + @implementer(IStorageBucketWriter) class WriteBucketProxy(object): + """ + Note: The various ``put_`` methods need to be called in the order in which the + bytes will get written. + """ fieldsize = 4 fieldstruct = ">L" def __init__(self, rref, server, data_size, block_size, num_segments, - num_share_hashes, uri_extension_size, pipeline_size=50000): + num_share_hashes, uri_extension_size, batch_size=1_000_000): self._rref = rref self._server = server self._data_size = data_size self._block_size = block_size self._num_segments = num_segments - self._written_bytes = 0 effective_segments = mathutil.next_power_of_k(num_segments,2) self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE @@ -130,11 +166,13 @@ class WriteBucketProxy(object): self._create_offsets(block_size, data_size) - # k=3, max_segment_size=128KiB gives us a typical segment of 43691 - # bytes. Setting the default pipeline_size to 50KB lets us get two - # segments onto the wire but not a third, which would keep the pipe - # filled. - self._pipeline = pipeline.Pipeline(pipeline_size) + # With a ~1MB batch size, max upload speed is 1MB/(round-trip latency) + # assuming the writing code waits for writes to finish, so 20MB/sec if + # latency is 50ms. In the US many people only have 1MB/sec upload speed + # as of 2022 (standard Comcast). For further discussion of how one + # might set batch sizes see + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3787#comment:1. + self._write_buffer = _WriteBuffer(batch_size) def get_allocated_size(self): return (self._offsets['uri_extension'] + self.fieldsize + @@ -179,7 +217,7 @@ class WriteBucketProxy(object): return "" % self._server.get_name() def put_header(self): - return self._write(0, self._offset_data) + return self._queue_write(0, self._offset_data) def put_block(self, segmentnum, data): offset = self._offsets['data'] + segmentnum * self._block_size @@ -193,13 +231,13 @@ class WriteBucketProxy(object): (self._block_size * (self._num_segments - 1))), len(data), self._block_size) - return self._write(offset, data) + return self._queue_write(offset, data) def put_crypttext_hashes(self, hashes): # plaintext_hash_tree precedes crypttext_hash_tree. It is not used, and # so is not explicitly written, but we need to write everything, so # fill it in with nulls. - d = self._write(self._offsets['plaintext_hash_tree'], b"\x00" * self._segment_hash_size) + d = self._queue_write(self._offsets['plaintext_hash_tree'], b"\x00" * self._segment_hash_size) d.addCallback(lambda _: self._really_put_crypttext_hashes(hashes)) return d @@ -212,7 +250,7 @@ class WriteBucketProxy(object): precondition(offset + len(data) <= self._offsets['block_hashes'], offset, len(data), offset+len(data), self._offsets['block_hashes']) - return self._write(offset, data) + return self._queue_write(offset, data) def put_block_hashes(self, blockhashes): offset = self._offsets['block_hashes'] @@ -223,7 +261,7 @@ class WriteBucketProxy(object): precondition(offset + len(data) <= self._offsets['share_hashes'], offset, len(data), offset+len(data), self._offsets['share_hashes']) - return self._write(offset, data) + return self._queue_write(offset, data) def put_share_hashes(self, sharehashes): # sharehashes is a list of (index, hash) tuples, so they get stored @@ -237,29 +275,45 @@ class WriteBucketProxy(object): precondition(offset + len(data) <= self._offsets['uri_extension'], offset, len(data), offset+len(data), self._offsets['uri_extension']) - return self._write(offset, data) + return self._queue_write(offset, data) def put_uri_extension(self, data): offset = self._offsets['uri_extension'] assert isinstance(data, bytes) precondition(len(data) == self._uri_extension_size) length = struct.pack(self.fieldstruct, len(data)) - return self._write(offset, length+data) + return self._queue_write(offset, length+data) - def _write(self, offset, data): - # use a Pipeline to pipeline several writes together. TODO: another - # speedup would be to coalesce small writes into a single call: this - # would reduce the foolscap CPU overhead per share, but wouldn't - # reduce the number of round trips, so it might not be worth the - # effort. - self._written_bytes += len(data) - return self._pipeline.add(len(data), - self._rref.callRemote, "write", offset, data) + def _queue_write(self, offset, data): + """ + This queues up small writes to be written in a single batched larger + write. + + Callers of this function are expected to queue the data in order, with + no holes. As such, the offset is technically unnecessary, but is used + to check the inputs. Possibly we should get rid of it. + """ + assert offset == self._write_buffer.get_total_bytes() + if self._write_buffer.queue_write(data): + return self._actually_write() + else: + return defer.succeed(False) + + def _actually_write(self): + """Write data to the server.""" + offset, data = self._write_buffer.flush() + return self._rref.callRemote("write", offset, data) def close(self): - assert self._written_bytes == self.get_allocated_size(), f"{self._written_bytes} != {self.get_allocated_size()}" - d = self._pipeline.add(0, self._rref.callRemote, "close") - d.addCallback(lambda ign: self._pipeline.flush()) + assert self._write_buffer.get_total_bytes() == self.get_allocated_size(), ( + f"{self._written_buffer.get_total_bytes_queued()} != {self.get_allocated_size()}" + ) + if self._write_buffer.get_queued_bytes() > 0: + d = self._actually_write() + else: + # No data queued, don't send empty string write. + d = defer.succeed(True) + d.addCallback(lambda _: self._rref.callRemote("close")) return d def abort(self): @@ -371,16 +425,16 @@ class ReadBucketProxy(object): self._fieldsize = fieldsize self._fieldstruct = fieldstruct - for field in ( 'data', - 'plaintext_hash_tree', # UNUSED - 'crypttext_hash_tree', - 'block_hashes', - 'share_hashes', - 'uri_extension', - ): + for field_name in ( 'data', + 'plaintext_hash_tree', # UNUSED + 'crypttext_hash_tree', + 'block_hashes', + 'share_hashes', + 'uri_extension', + ): offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0] x += fieldsize - self._offsets[field] = offset + self._offsets[field_name] = offset return self._offsets def _get_block_data(self, unused, blocknum, blocksize, thisblocksize): diff --git a/src/allmydata/test/test_pipeline.py b/src/allmydata/test/test_pipeline.py deleted file mode 100644 index 31d952836..000000000 --- a/src/allmydata/test/test_pipeline.py +++ /dev/null @@ -1,198 +0,0 @@ -""" -Tests for allmydata.util.pipeline. - -Ported to Python 3. -""" -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals - -from future.utils import PY2 -if PY2: - from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 - -import gc - -from twisted.internet import defer -from twisted.trial import unittest -from twisted.python import log -from twisted.python.failure import Failure - -from allmydata.util import pipeline - - -class Pipeline(unittest.TestCase): - def pause(self, *args, **kwargs): - d = defer.Deferred() - self.calls.append( (d, args, kwargs) ) - return d - - def failUnlessCallsAre(self, expected): - #print(self.calls) - #print(expected) - self.failUnlessEqual(len(self.calls), len(expected), self.calls) - for i,c in enumerate(self.calls): - self.failUnlessEqual(c[1:], expected[i], str(i)) - - def test_basic(self): - self.calls = [] - finished = [] - p = pipeline.Pipeline(100) - - d = p.flush() # fires immediately - d.addCallbacks(finished.append, log.err) - self.failUnlessEqual(len(finished), 1) - finished = [] - - d = p.add(10, self.pause, "one") - # the call should start right away, and our return Deferred should - # fire right away - d.addCallbacks(finished.append, log.err) - self.failUnlessEqual(len(finished), 1) - self.failUnlessEqual(finished[0], None) - self.failUnlessCallsAre([ ( ("one",) , {} ) ]) - self.failUnlessEqual(p.gauge, 10) - - # pipeline: [one] - - finished = [] - d = p.add(20, self.pause, "two", kw=2) - # pipeline: [one, two] - - # the call and the Deferred should fire right away - d.addCallbacks(finished.append, log.err) - self.failUnlessEqual(len(finished), 1) - self.failUnlessEqual(finished[0], None) - self.failUnlessCallsAre([ ( ("one",) , {} ), - ( ("two",) , {"kw": 2} ), - ]) - self.failUnlessEqual(p.gauge, 30) - - self.calls[0][0].callback("one-result") - # pipeline: [two] - self.failUnlessEqual(p.gauge, 20) - - finished = [] - d = p.add(90, self.pause, "three", "posarg1") - # pipeline: [two, three] - flushed = [] - fd = p.flush() - fd.addCallbacks(flushed.append, log.err) - self.failUnlessEqual(flushed, []) - - # the call will be made right away, but the return Deferred will not, - # because the pipeline is now full. - d.addCallbacks(finished.append, log.err) - self.failUnlessEqual(len(finished), 0) - self.failUnlessCallsAre([ ( ("one",) , {} ), - ( ("two",) , {"kw": 2} ), - ( ("three", "posarg1"), {} ), - ]) - self.failUnlessEqual(p.gauge, 110) - - self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause) - - # retiring either call will unblock the pipeline, causing the #3 - # Deferred to fire - self.calls[2][0].callback("three-result") - # pipeline: [two] - - self.failUnlessEqual(len(finished), 1) - self.failUnlessEqual(finished[0], None) - self.failUnlessEqual(flushed, []) - - # retiring call#2 will finally allow the flush() Deferred to fire - self.calls[1][0].callback("two-result") - self.failUnlessEqual(len(flushed), 1) - - def test_errors(self): - self.calls = [] - p = pipeline.Pipeline(100) - - d1 = p.add(200, self.pause, "one") - d2 = p.flush() - - finished = [] - d1.addBoth(finished.append) - self.failUnlessEqual(finished, []) - - flushed = [] - d2.addBoth(flushed.append) - self.failUnlessEqual(flushed, []) - - self.calls[0][0].errback(ValueError("oops")) - - self.failUnlessEqual(len(finished), 1) - f = finished[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - self.failUnlessIn("PipelineError", str(f.value)) - self.failUnlessIn("ValueError", str(f.value)) - r = repr(f.value) - self.failUnless("ValueError" in r, r) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - self.failUnlessEqual(len(flushed), 1) - f = flushed[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - # now that the pipeline is in the failed state, any new calls will - # fail immediately - - d3 = p.add(20, self.pause, "two") - - finished = [] - d3.addBoth(finished.append) - self.failUnlessEqual(len(finished), 1) - f = finished[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - r = repr(f.value) - self.failUnless("ValueError" in r, r) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - d4 = p.flush() - flushed = [] - d4.addBoth(flushed.append) - self.failUnlessEqual(len(flushed), 1) - f = flushed[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - def test_errors2(self): - self.calls = [] - p = pipeline.Pipeline(100) - - d1 = p.add(10, self.pause, "one") - d2 = p.add(20, self.pause, "two") - d3 = p.add(30, self.pause, "three") - d4 = p.flush() - - # one call fails, then the second one succeeds: make sure - # ExpandableDeferredList tolerates the second one - - flushed = [] - d4.addBoth(flushed.append) - self.failUnlessEqual(flushed, []) - - self.calls[0][0].errback(ValueError("oops")) - self.failUnlessEqual(len(flushed), 1) - f = flushed[0] - self.failUnless(isinstance(f, Failure)) - self.failUnless(f.check(pipeline.PipelineError)) - f2 = f.value.error - self.failUnless(f2.check(ValueError)) - - self.calls[1][0].callback("two-result") - self.calls[2][0].errback(ValueError("three-error")) - - del d1,d2,d3,d4 - gc.collect() # for PyPy diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 134609f81..9b9d2d8de 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -3,14 +3,9 @@ Tests for allmydata.storage. Ported to Python 3. """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals -from future.utils import native_str, PY2, bytes_to_native_str, bchr -if PY2: - from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 +from __future__ import annotations +from future.utils import native_str, bytes_to_native_str, bchr from six import ensure_str from io import ( @@ -59,7 +54,7 @@ from allmydata.storage.common import storage_index_to_dir, \ si_b2a, si_a2b from allmydata.storage.lease import LeaseInfo from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \ - ReadBucketProxy + ReadBucketProxy, _WriteBuffer from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \ LayoutInvalid, MDMFSIGNABLEHEADER, \ SIGNED_PREFIX, MDMFHEADER, \ @@ -3746,3 +3741,39 @@ class LeaseInfoTests(SyncTestCase): info.to_mutable_data(), HasLength(info.mutable_size()), ) + + +class WriteBufferTests(SyncTestCase): + """Tests for ``_WriteBuffer``.""" + + @given( + small_writes=strategies.lists( + strategies.binary(min_size=1, max_size=20), + min_size=10, max_size=20), + batch_size=strategies.integers(min_value=5, max_value=10) + ) + def test_write_buffer(self, small_writes: list[bytes], batch_size: int): + """ + ``_WriteBuffer`` coalesces small writes into bigger writes based on + the batch size. + """ + wb = _WriteBuffer(batch_size) + result = b"" + for data in small_writes: + should_flush = wb.queue_write(data) + if should_flush: + flushed_offset, flushed_data = wb.flush() + self.assertEqual(flushed_offset, len(result)) + # The flushed data is in batch sizes, or closest approximation + # given queued inputs: + self.assertTrue(batch_size <= len(flushed_data) < batch_size + len(data)) + result += flushed_data + + # Final flush: + remaining_length = wb.get_queued_bytes() + flushed_offset, flushed_data = wb.flush() + self.assertEqual(remaining_length, len(flushed_data)) + self.assertEqual(flushed_offset, len(result)) + result += flushed_data + + self.assertEqual(result, b"".join(small_writes)) diff --git a/src/allmydata/util/pipeline.py b/src/allmydata/util/pipeline.py deleted file mode 100644 index 31f5d5d49..000000000 --- a/src/allmydata/util/pipeline.py +++ /dev/null @@ -1,149 +0,0 @@ -""" -A pipeline of Deferreds. - -Ported to Python 3. -""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals - -from future.utils import PY2 -if PY2: - from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 - -from twisted.internet import defer -from twisted.python.failure import Failure -from twisted.python import log -from allmydata.util.assertutil import precondition - - -class PipelineError(Exception): - """One of the pipelined messages returned an error. The received Failure - object is stored in my .error attribute.""" - def __init__(self, error): - self.error = error - - def __repr__(self): - return "" % (self.error,) - def __str__(self): - return "" % (self.error,) - -class SingleFileError(Exception): - """You are not permitted to add a job to a full pipeline.""" - - -class ExpandableDeferredList(defer.Deferred, object): - # like DeferredList(fireOnOneErrback=True) with a built-in - # gatherResults(), but you can add new Deferreds until you close it. This - # gives you a chance to add don't-complain-about-unhandled-error errbacks - # immediately after attachment, regardless of whether you actually end up - # wanting the list or not. - def __init__(self): - defer.Deferred.__init__(self) - self.resultsReceived = 0 - self.resultList = [] - self.failure = None - self.closed = False - - def addDeferred(self, d): - precondition(not self.closed, "don't call addDeferred() on a closed ExpandableDeferredList") - index = len(self.resultList) - self.resultList.append(None) - d.addCallbacks(self._cbDeferred, self._ebDeferred, - callbackArgs=(index,)) - return d - - def close(self): - self.closed = True - self.checkForFinished() - - def checkForFinished(self): - if not self.closed: - return - if self.called: - return - if self.failure: - self.errback(self.failure) - elif self.resultsReceived == len(self.resultList): - self.callback(self.resultList) - - def _cbDeferred(self, res, index): - self.resultList[index] = res - self.resultsReceived += 1 - self.checkForFinished() - return res - - def _ebDeferred(self, f): - self.failure = f - self.checkForFinished() - return f - - -class Pipeline(object): - """I manage a size-limited pipeline of Deferred operations, usually - callRemote() messages.""" - - def __init__(self, capacity): - self.capacity = capacity # how full we can be - self.gauge = 0 # how full we are - self.failure = None - self.waiting = [] # callers of add() who are blocked - self.unflushed = ExpandableDeferredList() - - def add(self, _size, _func, *args, **kwargs): - # We promise that all the Deferreds we return will fire in the order - # they were returned. To make it easier to keep this promise, we - # prohibit multiple outstanding calls to add() . - if self.waiting: - raise SingleFileError - if self.failure: - return defer.fail(self.failure) - self.gauge += _size - fd = defer.maybeDeferred(_func, *args, **kwargs) - fd.addBoth(self._call_finished, _size) - self.unflushed.addDeferred(fd) - fd.addErrback(self._eat_pipeline_errors) - fd.addErrback(log.err, "_eat_pipeline_errors didn't eat it") - if self.gauge < self.capacity: - return defer.succeed(None) - d = defer.Deferred() - self.waiting.append(d) - return d - - def flush(self): - if self.failure: - return defer.fail(self.failure) - d, self.unflushed = self.unflushed, ExpandableDeferredList() - d.close() - d.addErrback(self._flushed_error) - return d - - def _flushed_error(self, f): - precondition(self.failure) # should have been set by _call_finished - return self.failure - - def _call_finished(self, res, size): - self.gauge -= size - if isinstance(res, Failure): - res = Failure(PipelineError(res)) - if not self.failure: - self.failure = res - if self.failure: - while self.waiting: - d = self.waiting.pop(0) - d.errback(self.failure) - else: - while self.waiting and (self.gauge < self.capacity): - d = self.waiting.pop(0) - d.callback(None) - # the d.callback() might trigger a new call to add(), which - # will raise our gauge and might cause the pipeline to be - # filled. So the while() loop gets a chance to tell the - # caller to stop. - return res - - def _eat_pipeline_errors(self, f): - f.trap(PipelineError) - return None