From a4787ca45ebebf3c59216366b5edf3a56f548003 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 22 Nov 2022 14:12:14 -0500 Subject: [PATCH] Batch writes much more aggressively. --- src/allmydata/immutable/layout.py | 69 ++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 25 deletions(-) diff --git a/src/allmydata/immutable/layout.py b/src/allmydata/immutable/layout.py index d552d43c4..863b1cb75 100644 --- a/src/allmydata/immutable/layout.py +++ b/src/allmydata/immutable/layout.py @@ -113,13 +113,14 @@ class WriteBucketProxy(object): fieldstruct = ">L" def __init__(self, rref, server, data_size, block_size, num_segments, - num_share_hashes, uri_extension_size, pipeline_size=50000): + num_share_hashes, uri_extension_size, batch_size=1_000_000): self._rref = rref self._server = server self._data_size = data_size self._block_size = block_size self._num_segments = num_segments self._written_bytes = 0 + self._to_write = b"" effective_segments = mathutil.next_power_of_k(num_segments,2) self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE @@ -130,11 +131,13 @@ class WriteBucketProxy(object): self._create_offsets(block_size, data_size) - # k=3, max_segment_size=128KiB gives us a typical segment of 43691 - # bytes. Setting the default pipeline_size to 50KB lets us get two - # segments onto the wire but not a third, which would keep the pipe - # filled. - self._pipeline = pipeline.Pipeline(pipeline_size) + # With a ~1MB batch size, max upload speed is 1MB * round-trip latency + # assuming the writing code waits for writes to finish, so 20MB/sec if + # latency is 50ms. In the US many people only have 1MB/sec upload speed + # as of 2022 (standard Comcast). For further discussion of how one + # might set batch sizes see + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3787#comment:1. + self._batch_size = batch_size def get_allocated_size(self): return (self._offsets['uri_extension'] + self.fieldsize + @@ -179,7 +182,7 @@ class WriteBucketProxy(object): return "" % self._server.get_name() def put_header(self): - return self._write(0, self._offset_data) + return self._queue_write(0, self._offset_data) def put_block(self, segmentnum, data): offset = self._offsets['data'] + segmentnum * self._block_size @@ -193,13 +196,13 @@ class WriteBucketProxy(object): (self._block_size * (self._num_segments - 1))), len(data), self._block_size) - return self._write(offset, data) + return self._queue_write(offset, data) def put_crypttext_hashes(self, hashes): # plaintext_hash_tree precedes crypttext_hash_tree. It is not used, and # so is not explicitly written, but we need to write everything, so # fill it in with nulls. - d = self._write(self._offsets['plaintext_hash_tree'], b"\x00" * self._segment_hash_size) + d = self._queue_write(self._offsets['plaintext_hash_tree'], b"\x00" * self._segment_hash_size) d.addCallback(lambda _: self._really_put_crypttext_hashes(hashes)) return d @@ -212,7 +215,7 @@ class WriteBucketProxy(object): precondition(offset + len(data) <= self._offsets['block_hashes'], offset, len(data), offset+len(data), self._offsets['block_hashes']) - return self._write(offset, data) + return self._queue_write(offset, data) def put_block_hashes(self, blockhashes): offset = self._offsets['block_hashes'] @@ -223,7 +226,7 @@ class WriteBucketProxy(object): precondition(offset + len(data) <= self._offsets['share_hashes'], offset, len(data), offset+len(data), self._offsets['share_hashes']) - return self._write(offset, data) + return self._queue_write(offset, data) def put_share_hashes(self, sharehashes): # sharehashes is a list of (index, hash) tuples, so they get stored @@ -237,29 +240,45 @@ class WriteBucketProxy(object): precondition(offset + len(data) <= self._offsets['uri_extension'], offset, len(data), offset+len(data), self._offsets['uri_extension']) - return self._write(offset, data) + return self._queue_write(offset, data) def put_uri_extension(self, data): offset = self._offsets['uri_extension'] assert isinstance(data, bytes) precondition(len(data) == self._uri_extension_size) length = struct.pack(self.fieldstruct, len(data)) - return self._write(offset, length+data) + return self._queue_write(offset, length+data) - def _write(self, offset, data): - # use a Pipeline to pipeline several writes together. TODO: another - # speedup would be to coalesce small writes into a single call: this - # would reduce the foolscap CPU overhead per share, but wouldn't - # reduce the number of round trips, so it might not be worth the - # effort. - self._written_bytes += len(data) - return self._pipeline.add(len(data), - self._rref.callRemote, "write", offset, data) + def _queue_write(self, offset, data): + """ + This queues up small writes to be written in a single batched larger + write. + + Callers of this function are expected to queue the data in order, with + no holes. As such, the offset is technically unnecessary, but is used + to check the inputs. Possibly we should get rid of it. + """ + assert offset == self._written_bytes + len(self._to_write) + self._to_write += data + if len(self._to_write) >= self._batch_size: + return self._actually_write() + else: + return defer.succeed(False) + + def _actually_write(self): + """Actually write data to the server.""" + offset = self._written_bytes + data = self._to_write + self._written_bytes += len(self._to_write) + self._to_write = b"" + return self._rref.callRemote("write", offset, data) def close(self): - assert self._written_bytes == self.get_allocated_size(), f"{self._written_bytes} != {self.get_allocated_size()}" - d = self._pipeline.add(0, self._rref.callRemote, "close") - d.addCallback(lambda ign: self._pipeline.flush()) + assert self._written_bytes + len(self._to_write) == self.get_allocated_size(), ( + f"{self._written_bytes} + {len(self._to_write)} != {self.get_allocated_size()}" + ) + d = self._actually_write() + d.addCallback(lambda _: self._rref.callRemote("close")) return d def abort(self):