mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-13 13:53:07 +00:00
Batch writes much more aggressively.
This commit is contained in:
parent
c296071767
commit
a4787ca45e
@ -113,13 +113,14 @@ class WriteBucketProxy(object):
|
||||
fieldstruct = ">L"
|
||||
|
||||
def __init__(self, rref, server, data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size, pipeline_size=50000):
|
||||
num_share_hashes, uri_extension_size, batch_size=1_000_000):
|
||||
self._rref = rref
|
||||
self._server = server
|
||||
self._data_size = data_size
|
||||
self._block_size = block_size
|
||||
self._num_segments = num_segments
|
||||
self._written_bytes = 0
|
||||
self._to_write = b""
|
||||
|
||||
effective_segments = mathutil.next_power_of_k(num_segments,2)
|
||||
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
|
||||
@ -130,11 +131,13 @@ class WriteBucketProxy(object):
|
||||
|
||||
self._create_offsets(block_size, data_size)
|
||||
|
||||
# k=3, max_segment_size=128KiB gives us a typical segment of 43691
|
||||
# bytes. Setting the default pipeline_size to 50KB lets us get two
|
||||
# segments onto the wire but not a third, which would keep the pipe
|
||||
# filled.
|
||||
self._pipeline = pipeline.Pipeline(pipeline_size)
|
||||
# With a ~1MB batch size, max upload speed is 1MB * round-trip latency
|
||||
# assuming the writing code waits for writes to finish, so 20MB/sec if
|
||||
# latency is 50ms. In the US many people only have 1MB/sec upload speed
|
||||
# as of 2022 (standard Comcast). For further discussion of how one
|
||||
# might set batch sizes see
|
||||
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3787#comment:1.
|
||||
self._batch_size = batch_size
|
||||
|
||||
def get_allocated_size(self):
|
||||
return (self._offsets['uri_extension'] + self.fieldsize +
|
||||
@ -179,7 +182,7 @@ class WriteBucketProxy(object):
|
||||
return "<WriteBucketProxy for node %r>" % self._server.get_name()
|
||||
|
||||
def put_header(self):
|
||||
return self._write(0, self._offset_data)
|
||||
return self._queue_write(0, self._offset_data)
|
||||
|
||||
def put_block(self, segmentnum, data):
|
||||
offset = self._offsets['data'] + segmentnum * self._block_size
|
||||
@ -193,13 +196,13 @@ class WriteBucketProxy(object):
|
||||
(self._block_size *
|
||||
(self._num_segments - 1))),
|
||||
len(data), self._block_size)
|
||||
return self._write(offset, data)
|
||||
return self._queue_write(offset, data)
|
||||
|
||||
def put_crypttext_hashes(self, hashes):
|
||||
# plaintext_hash_tree precedes crypttext_hash_tree. It is not used, and
|
||||
# so is not explicitly written, but we need to write everything, so
|
||||
# fill it in with nulls.
|
||||
d = self._write(self._offsets['plaintext_hash_tree'], b"\x00" * self._segment_hash_size)
|
||||
d = self._queue_write(self._offsets['plaintext_hash_tree'], b"\x00" * self._segment_hash_size)
|
||||
d.addCallback(lambda _: self._really_put_crypttext_hashes(hashes))
|
||||
return d
|
||||
|
||||
@ -212,7 +215,7 @@ class WriteBucketProxy(object):
|
||||
precondition(offset + len(data) <= self._offsets['block_hashes'],
|
||||
offset, len(data), offset+len(data),
|
||||
self._offsets['block_hashes'])
|
||||
return self._write(offset, data)
|
||||
return self._queue_write(offset, data)
|
||||
|
||||
def put_block_hashes(self, blockhashes):
|
||||
offset = self._offsets['block_hashes']
|
||||
@ -223,7 +226,7 @@ class WriteBucketProxy(object):
|
||||
precondition(offset + len(data) <= self._offsets['share_hashes'],
|
||||
offset, len(data), offset+len(data),
|
||||
self._offsets['share_hashes'])
|
||||
return self._write(offset, data)
|
||||
return self._queue_write(offset, data)
|
||||
|
||||
def put_share_hashes(self, sharehashes):
|
||||
# sharehashes is a list of (index, hash) tuples, so they get stored
|
||||
@ -237,29 +240,45 @@ class WriteBucketProxy(object):
|
||||
precondition(offset + len(data) <= self._offsets['uri_extension'],
|
||||
offset, len(data), offset+len(data),
|
||||
self._offsets['uri_extension'])
|
||||
return self._write(offset, data)
|
||||
return self._queue_write(offset, data)
|
||||
|
||||
def put_uri_extension(self, data):
|
||||
offset = self._offsets['uri_extension']
|
||||
assert isinstance(data, bytes)
|
||||
precondition(len(data) == self._uri_extension_size)
|
||||
length = struct.pack(self.fieldstruct, len(data))
|
||||
return self._write(offset, length+data)
|
||||
return self._queue_write(offset, length+data)
|
||||
|
||||
def _write(self, offset, data):
|
||||
# use a Pipeline to pipeline several writes together. TODO: another
|
||||
# speedup would be to coalesce small writes into a single call: this
|
||||
# would reduce the foolscap CPU overhead per share, but wouldn't
|
||||
# reduce the number of round trips, so it might not be worth the
|
||||
# effort.
|
||||
self._written_bytes += len(data)
|
||||
return self._pipeline.add(len(data),
|
||||
self._rref.callRemote, "write", offset, data)
|
||||
def _queue_write(self, offset, data):
|
||||
"""
|
||||
This queues up small writes to be written in a single batched larger
|
||||
write.
|
||||
|
||||
Callers of this function are expected to queue the data in order, with
|
||||
no holes. As such, the offset is technically unnecessary, but is used
|
||||
to check the inputs. Possibly we should get rid of it.
|
||||
"""
|
||||
assert offset == self._written_bytes + len(self._to_write)
|
||||
self._to_write += data
|
||||
if len(self._to_write) >= self._batch_size:
|
||||
return self._actually_write()
|
||||
else:
|
||||
return defer.succeed(False)
|
||||
|
||||
def _actually_write(self):
|
||||
"""Actually write data to the server."""
|
||||
offset = self._written_bytes
|
||||
data = self._to_write
|
||||
self._written_bytes += len(self._to_write)
|
||||
self._to_write = b""
|
||||
return self._rref.callRemote("write", offset, data)
|
||||
|
||||
def close(self):
|
||||
assert self._written_bytes == self.get_allocated_size(), f"{self._written_bytes} != {self.get_allocated_size()}"
|
||||
d = self._pipeline.add(0, self._rref.callRemote, "close")
|
||||
d.addCallback(lambda ign: self._pipeline.flush())
|
||||
assert self._written_bytes + len(self._to_write) == self.get_allocated_size(), (
|
||||
f"{self._written_bytes} + {len(self._to_write)} != {self.get_allocated_size()}"
|
||||
)
|
||||
d = self._actually_write()
|
||||
d.addCallback(lambda _: self._rref.callRemote("close"))
|
||||
return d
|
||||
|
||||
def abort(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user