Merge pull request #1231 from tahoe-lafs/3939-faster-http-protocol

Faster http protocol, part 1 (and maybe faster Foolscap too, while we're at it)

Fixes ticket:3939
This commit is contained in:
Itamar Turner-Trauring 2022-12-05 14:05:55 -05:00 committed by GitHub
commit 1eba202c08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 138 additions and 397 deletions

View File

@ -0,0 +1 @@
Uploading immutables will now better use available bandwidth, which should allow for faster uploads in many cases.

View File

@ -262,6 +262,8 @@ class Encoder(object):
d.addCallback(lambda res: self.finish_hashing()) d.addCallback(lambda res: self.finish_hashing())
# These calls have to happen in order; layout.py now requires writes to
# be appended to the data written so far.
d.addCallback(lambda res: d.addCallback(lambda res:
self.send_crypttext_hash_tree_to_all_shareholders()) self.send_crypttext_hash_tree_to_all_shareholders())
d.addCallback(lambda res: self.send_all_block_hash_trees()) d.addCallback(lambda res: self.send_all_block_hash_trees())

View File

@ -1,21 +1,18 @@
""" """
Ported to Python 3. Ported to Python 3.
""" """
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2 from __future__ import annotations
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import struct import struct
from io import BytesIO
from attrs import define, field
from zope.interface import implementer from zope.interface import implementer
from twisted.internet import defer from twisted.internet import defer
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \ from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
FileTooLargeError, HASH_SIZE FileTooLargeError, HASH_SIZE
from allmydata.util import mathutil, observer, pipeline, log from allmydata.util import mathutil, observer, log
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.storage.server import si_b2a from allmydata.storage.server import si_b2a
@ -107,19 +104,58 @@ def make_write_bucket_proxy(rref, server,
num_share_hashes, uri_extension_size) num_share_hashes, uri_extension_size)
return wbp return wbp
@define
class _WriteBuffer:
"""
Queue up small writes to be written in a single batched larger write.
"""
_batch_size: int
_to_write : BytesIO = field(factory=BytesIO)
_written_bytes : int = field(default=0)
def queue_write(self, data: bytes) -> bool:
"""
Queue a write. If the result is ``False``, no further action is needed
for now. If the result is some ``True``, it's time to call ``flush()``
and do a real write.
"""
self._to_write.write(data)
return self.get_queued_bytes() >= self._batch_size
def flush(self) -> tuple[int, bytes]:
"""Return offset and data to be written."""
offset = self._written_bytes
data = self._to_write.getvalue()
self._written_bytes += len(data)
self._to_write = BytesIO()
return (offset, data)
def get_queued_bytes(self) -> int:
"""Return number of queued, unwritten bytes."""
return self._to_write.tell()
def get_total_bytes(self) -> int:
"""Return how many bytes were written or queued in total."""
return self._written_bytes + self.get_queued_bytes()
@implementer(IStorageBucketWriter) @implementer(IStorageBucketWriter)
class WriteBucketProxy(object): class WriteBucketProxy(object):
"""
Note: The various ``put_`` methods need to be called in the order in which the
bytes will get written.
"""
fieldsize = 4 fieldsize = 4
fieldstruct = ">L" fieldstruct = ">L"
def __init__(self, rref, server, data_size, block_size, num_segments, def __init__(self, rref, server, data_size, block_size, num_segments,
num_share_hashes, uri_extension_size, pipeline_size=50000): num_share_hashes, uri_extension_size, batch_size=1_000_000):
self._rref = rref self._rref = rref
self._server = server self._server = server
self._data_size = data_size self._data_size = data_size
self._block_size = block_size self._block_size = block_size
self._num_segments = num_segments self._num_segments = num_segments
self._written_bytes = 0
effective_segments = mathutil.next_power_of_k(num_segments,2) effective_segments = mathutil.next_power_of_k(num_segments,2)
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
@ -130,11 +166,13 @@ class WriteBucketProxy(object):
self._create_offsets(block_size, data_size) self._create_offsets(block_size, data_size)
# k=3, max_segment_size=128KiB gives us a typical segment of 43691 # With a ~1MB batch size, max upload speed is 1MB/(round-trip latency)
# bytes. Setting the default pipeline_size to 50KB lets us get two # assuming the writing code waits for writes to finish, so 20MB/sec if
# segments onto the wire but not a third, which would keep the pipe # latency is 50ms. In the US many people only have 1MB/sec upload speed
# filled. # as of 2022 (standard Comcast). For further discussion of how one
self._pipeline = pipeline.Pipeline(pipeline_size) # might set batch sizes see
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3787#comment:1.
self._write_buffer = _WriteBuffer(batch_size)
def get_allocated_size(self): def get_allocated_size(self):
return (self._offsets['uri_extension'] + self.fieldsize + return (self._offsets['uri_extension'] + self.fieldsize +
@ -179,7 +217,7 @@ class WriteBucketProxy(object):
return "<WriteBucketProxy for node %r>" % self._server.get_name() return "<WriteBucketProxy for node %r>" % self._server.get_name()
def put_header(self): def put_header(self):
return self._write(0, self._offset_data) return self._queue_write(0, self._offset_data)
def put_block(self, segmentnum, data): def put_block(self, segmentnum, data):
offset = self._offsets['data'] + segmentnum * self._block_size offset = self._offsets['data'] + segmentnum * self._block_size
@ -193,13 +231,13 @@ class WriteBucketProxy(object):
(self._block_size * (self._block_size *
(self._num_segments - 1))), (self._num_segments - 1))),
len(data), self._block_size) len(data), self._block_size)
return self._write(offset, data) return self._queue_write(offset, data)
def put_crypttext_hashes(self, hashes): def put_crypttext_hashes(self, hashes):
# plaintext_hash_tree precedes crypttext_hash_tree. It is not used, and # plaintext_hash_tree precedes crypttext_hash_tree. It is not used, and
# so is not explicitly written, but we need to write everything, so # so is not explicitly written, but we need to write everything, so
# fill it in with nulls. # fill it in with nulls.
d = self._write(self._offsets['plaintext_hash_tree'], b"\x00" * self._segment_hash_size) d = self._queue_write(self._offsets['plaintext_hash_tree'], b"\x00" * self._segment_hash_size)
d.addCallback(lambda _: self._really_put_crypttext_hashes(hashes)) d.addCallback(lambda _: self._really_put_crypttext_hashes(hashes))
return d return d
@ -212,7 +250,7 @@ class WriteBucketProxy(object):
precondition(offset + len(data) <= self._offsets['block_hashes'], precondition(offset + len(data) <= self._offsets['block_hashes'],
offset, len(data), offset+len(data), offset, len(data), offset+len(data),
self._offsets['block_hashes']) self._offsets['block_hashes'])
return self._write(offset, data) return self._queue_write(offset, data)
def put_block_hashes(self, blockhashes): def put_block_hashes(self, blockhashes):
offset = self._offsets['block_hashes'] offset = self._offsets['block_hashes']
@ -223,7 +261,7 @@ class WriteBucketProxy(object):
precondition(offset + len(data) <= self._offsets['share_hashes'], precondition(offset + len(data) <= self._offsets['share_hashes'],
offset, len(data), offset+len(data), offset, len(data), offset+len(data),
self._offsets['share_hashes']) self._offsets['share_hashes'])
return self._write(offset, data) return self._queue_write(offset, data)
def put_share_hashes(self, sharehashes): def put_share_hashes(self, sharehashes):
# sharehashes is a list of (index, hash) tuples, so they get stored # sharehashes is a list of (index, hash) tuples, so they get stored
@ -237,29 +275,45 @@ class WriteBucketProxy(object):
precondition(offset + len(data) <= self._offsets['uri_extension'], precondition(offset + len(data) <= self._offsets['uri_extension'],
offset, len(data), offset+len(data), offset, len(data), offset+len(data),
self._offsets['uri_extension']) self._offsets['uri_extension'])
return self._write(offset, data) return self._queue_write(offset, data)
def put_uri_extension(self, data): def put_uri_extension(self, data):
offset = self._offsets['uri_extension'] offset = self._offsets['uri_extension']
assert isinstance(data, bytes) assert isinstance(data, bytes)
precondition(len(data) == self._uri_extension_size) precondition(len(data) == self._uri_extension_size)
length = struct.pack(self.fieldstruct, len(data)) length = struct.pack(self.fieldstruct, len(data))
return self._write(offset, length+data) return self._queue_write(offset, length+data)
def _write(self, offset, data): def _queue_write(self, offset, data):
# use a Pipeline to pipeline several writes together. TODO: another """
# speedup would be to coalesce small writes into a single call: this This queues up small writes to be written in a single batched larger
# would reduce the foolscap CPU overhead per share, but wouldn't write.
# reduce the number of round trips, so it might not be worth the
# effort. Callers of this function are expected to queue the data in order, with
self._written_bytes += len(data) no holes. As such, the offset is technically unnecessary, but is used
return self._pipeline.add(len(data), to check the inputs. Possibly we should get rid of it.
self._rref.callRemote, "write", offset, data) """
assert offset == self._write_buffer.get_total_bytes()
if self._write_buffer.queue_write(data):
return self._actually_write()
else:
return defer.succeed(False)
def _actually_write(self):
"""Write data to the server."""
offset, data = self._write_buffer.flush()
return self._rref.callRemote("write", offset, data)
def close(self): def close(self):
assert self._written_bytes == self.get_allocated_size(), f"{self._written_bytes} != {self.get_allocated_size()}" assert self._write_buffer.get_total_bytes() == self.get_allocated_size(), (
d = self._pipeline.add(0, self._rref.callRemote, "close") f"{self._written_buffer.get_total_bytes_queued()} != {self.get_allocated_size()}"
d.addCallback(lambda ign: self._pipeline.flush()) )
if self._write_buffer.get_queued_bytes() > 0:
d = self._actually_write()
else:
# No data queued, don't send empty string write.
d = defer.succeed(True)
d.addCallback(lambda _: self._rref.callRemote("close"))
return d return d
def abort(self): def abort(self):
@ -371,16 +425,16 @@ class ReadBucketProxy(object):
self._fieldsize = fieldsize self._fieldsize = fieldsize
self._fieldstruct = fieldstruct self._fieldstruct = fieldstruct
for field in ( 'data', for field_name in ( 'data',
'plaintext_hash_tree', # UNUSED 'plaintext_hash_tree', # UNUSED
'crypttext_hash_tree', 'crypttext_hash_tree',
'block_hashes', 'block_hashes',
'share_hashes', 'share_hashes',
'uri_extension', 'uri_extension',
): ):
offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0] offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0]
x += fieldsize x += fieldsize
self._offsets[field] = offset self._offsets[field_name] = offset
return self._offsets return self._offsets
def _get_block_data(self, unused, blocknum, blocksize, thisblocksize): def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):

View File

@ -1,198 +0,0 @@
"""
Tests for allmydata.util.pipeline.
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import gc
from twisted.internet import defer
from twisted.trial import unittest
from twisted.python import log
from twisted.python.failure import Failure
from allmydata.util import pipeline
class Pipeline(unittest.TestCase):
def pause(self, *args, **kwargs):
d = defer.Deferred()
self.calls.append( (d, args, kwargs) )
return d
def failUnlessCallsAre(self, expected):
#print(self.calls)
#print(expected)
self.failUnlessEqual(len(self.calls), len(expected), self.calls)
for i,c in enumerate(self.calls):
self.failUnlessEqual(c[1:], expected[i], str(i))
def test_basic(self):
self.calls = []
finished = []
p = pipeline.Pipeline(100)
d = p.flush() # fires immediately
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
finished = []
d = p.add(10, self.pause, "one")
# the call should start right away, and our return Deferred should
# fire right away
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessCallsAre([ ( ("one",) , {} ) ])
self.failUnlessEqual(p.gauge, 10)
# pipeline: [one]
finished = []
d = p.add(20, self.pause, "two", kw=2)
# pipeline: [one, two]
# the call and the Deferred should fire right away
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessCallsAre([ ( ("one",) , {} ),
( ("two",) , {"kw": 2} ),
])
self.failUnlessEqual(p.gauge, 30)
self.calls[0][0].callback("one-result")
# pipeline: [two]
self.failUnlessEqual(p.gauge, 20)
finished = []
d = p.add(90, self.pause, "three", "posarg1")
# pipeline: [two, three]
flushed = []
fd = p.flush()
fd.addCallbacks(flushed.append, log.err)
self.failUnlessEqual(flushed, [])
# the call will be made right away, but the return Deferred will not,
# because the pipeline is now full.
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 0)
self.failUnlessCallsAre([ ( ("one",) , {} ),
( ("two",) , {"kw": 2} ),
( ("three", "posarg1"), {} ),
])
self.failUnlessEqual(p.gauge, 110)
self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause)
# retiring either call will unblock the pipeline, causing the #3
# Deferred to fire
self.calls[2][0].callback("three-result")
# pipeline: [two]
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessEqual(flushed, [])
# retiring call#2 will finally allow the flush() Deferred to fire
self.calls[1][0].callback("two-result")
self.failUnlessEqual(len(flushed), 1)
def test_errors(self):
self.calls = []
p = pipeline.Pipeline(100)
d1 = p.add(200, self.pause, "one")
d2 = p.flush()
finished = []
d1.addBoth(finished.append)
self.failUnlessEqual(finished, [])
flushed = []
d2.addBoth(flushed.append)
self.failUnlessEqual(flushed, [])
self.calls[0][0].errback(ValueError("oops"))
self.failUnlessEqual(len(finished), 1)
f = finished[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
self.failUnlessIn("PipelineError", str(f.value))
self.failUnlessIn("ValueError", str(f.value))
r = repr(f.value)
self.failUnless("ValueError" in r, r)
f2 = f.value.error
self.failUnless(f2.check(ValueError))
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
# now that the pipeline is in the failed state, any new calls will
# fail immediately
d3 = p.add(20, self.pause, "two")
finished = []
d3.addBoth(finished.append)
self.failUnlessEqual(len(finished), 1)
f = finished[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
r = repr(f.value)
self.failUnless("ValueError" in r, r)
f2 = f.value.error
self.failUnless(f2.check(ValueError))
d4 = p.flush()
flushed = []
d4.addBoth(flushed.append)
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
def test_errors2(self):
self.calls = []
p = pipeline.Pipeline(100)
d1 = p.add(10, self.pause, "one")
d2 = p.add(20, self.pause, "two")
d3 = p.add(30, self.pause, "three")
d4 = p.flush()
# one call fails, then the second one succeeds: make sure
# ExpandableDeferredList tolerates the second one
flushed = []
d4.addBoth(flushed.append)
self.failUnlessEqual(flushed, [])
self.calls[0][0].errback(ValueError("oops"))
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
self.calls[1][0].callback("two-result")
self.calls[2][0].errback(ValueError("three-error"))
del d1,d2,d3,d4
gc.collect() # for PyPy

View File

@ -3,14 +3,9 @@ Tests for allmydata.storage.
Ported to Python 3. Ported to Python 3.
""" """
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import native_str, PY2, bytes_to_native_str, bchr from __future__ import annotations
if PY2: from future.utils import native_str, bytes_to_native_str, bchr
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from six import ensure_str from six import ensure_str
from io import ( from io import (
@ -59,7 +54,7 @@ from allmydata.storage.common import storage_index_to_dir, \
si_b2a, si_a2b si_b2a, si_a2b
from allmydata.storage.lease import LeaseInfo from allmydata.storage.lease import LeaseInfo
from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \ from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
ReadBucketProxy ReadBucketProxy, _WriteBuffer
from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \ from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
LayoutInvalid, MDMFSIGNABLEHEADER, \ LayoutInvalid, MDMFSIGNABLEHEADER, \
SIGNED_PREFIX, MDMFHEADER, \ SIGNED_PREFIX, MDMFHEADER, \
@ -3746,3 +3741,39 @@ class LeaseInfoTests(SyncTestCase):
info.to_mutable_data(), info.to_mutable_data(),
HasLength(info.mutable_size()), HasLength(info.mutable_size()),
) )
class WriteBufferTests(SyncTestCase):
"""Tests for ``_WriteBuffer``."""
@given(
small_writes=strategies.lists(
strategies.binary(min_size=1, max_size=20),
min_size=10, max_size=20),
batch_size=strategies.integers(min_value=5, max_value=10)
)
def test_write_buffer(self, small_writes: list[bytes], batch_size: int):
"""
``_WriteBuffer`` coalesces small writes into bigger writes based on
the batch size.
"""
wb = _WriteBuffer(batch_size)
result = b""
for data in small_writes:
should_flush = wb.queue_write(data)
if should_flush:
flushed_offset, flushed_data = wb.flush()
self.assertEqual(flushed_offset, len(result))
# The flushed data is in batch sizes, or closest approximation
# given queued inputs:
self.assertTrue(batch_size <= len(flushed_data) < batch_size + len(data))
result += flushed_data
# Final flush:
remaining_length = wb.get_queued_bytes()
flushed_offset, flushed_data = wb.flush()
self.assertEqual(remaining_length, len(flushed_data))
self.assertEqual(flushed_offset, len(result))
result += flushed_data
self.assertEqual(result, b"".join(small_writes))

View File

@ -1,149 +0,0 @@
"""
A pipeline of Deferreds.
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from twisted.internet import defer
from twisted.python.failure import Failure
from twisted.python import log
from allmydata.util.assertutil import precondition
class PipelineError(Exception):
"""One of the pipelined messages returned an error. The received Failure
object is stored in my .error attribute."""
def __init__(self, error):
self.error = error
def __repr__(self):
return "<PipelineError error=(%r)>" % (self.error,)
def __str__(self):
return "<PipelineError error=(%s)>" % (self.error,)
class SingleFileError(Exception):
"""You are not permitted to add a job to a full pipeline."""
class ExpandableDeferredList(defer.Deferred, object):
# like DeferredList(fireOnOneErrback=True) with a built-in
# gatherResults(), but you can add new Deferreds until you close it. This
# gives you a chance to add don't-complain-about-unhandled-error errbacks
# immediately after attachment, regardless of whether you actually end up
# wanting the list or not.
def __init__(self):
defer.Deferred.__init__(self)
self.resultsReceived = 0
self.resultList = []
self.failure = None
self.closed = False
def addDeferred(self, d):
precondition(not self.closed, "don't call addDeferred() on a closed ExpandableDeferredList")
index = len(self.resultList)
self.resultList.append(None)
d.addCallbacks(self._cbDeferred, self._ebDeferred,
callbackArgs=(index,))
return d
def close(self):
self.closed = True
self.checkForFinished()
def checkForFinished(self):
if not self.closed:
return
if self.called:
return
if self.failure:
self.errback(self.failure)
elif self.resultsReceived == len(self.resultList):
self.callback(self.resultList)
def _cbDeferred(self, res, index):
self.resultList[index] = res
self.resultsReceived += 1
self.checkForFinished()
return res
def _ebDeferred(self, f):
self.failure = f
self.checkForFinished()
return f
class Pipeline(object):
"""I manage a size-limited pipeline of Deferred operations, usually
callRemote() messages."""
def __init__(self, capacity):
self.capacity = capacity # how full we can be
self.gauge = 0 # how full we are
self.failure = None
self.waiting = [] # callers of add() who are blocked
self.unflushed = ExpandableDeferredList()
def add(self, _size, _func, *args, **kwargs):
# We promise that all the Deferreds we return will fire in the order
# they were returned. To make it easier to keep this promise, we
# prohibit multiple outstanding calls to add() .
if self.waiting:
raise SingleFileError
if self.failure:
return defer.fail(self.failure)
self.gauge += _size
fd = defer.maybeDeferred(_func, *args, **kwargs)
fd.addBoth(self._call_finished, _size)
self.unflushed.addDeferred(fd)
fd.addErrback(self._eat_pipeline_errors)
fd.addErrback(log.err, "_eat_pipeline_errors didn't eat it")
if self.gauge < self.capacity:
return defer.succeed(None)
d = defer.Deferred()
self.waiting.append(d)
return d
def flush(self):
if self.failure:
return defer.fail(self.failure)
d, self.unflushed = self.unflushed, ExpandableDeferredList()
d.close()
d.addErrback(self._flushed_error)
return d
def _flushed_error(self, f):
precondition(self.failure) # should have been set by _call_finished
return self.failure
def _call_finished(self, res, size):
self.gauge -= size
if isinstance(res, Failure):
res = Failure(PipelineError(res))
if not self.failure:
self.failure = res
if self.failure:
while self.waiting:
d = self.waiting.pop(0)
d.errback(self.failure)
else:
while self.waiting and (self.gauge < self.capacity):
d = self.waiting.pop(0)
d.callback(None)
# the d.callback() might trigger a new call to add(), which
# will raise our gauge and might cause the pipeline to be
# filled. So the while() loop gets a chance to tell the
# caller to stop.
return res
def _eat_pipeline_errors(self, f):
f.trap(PipelineError)
return None