Refactor to make core data structure easier to test in isolation.

This commit is contained in:
Itamar Turner-Trauring 2022-11-22 15:17:56 -05:00
parent f638aec0af
commit d86d578034

View File

@ -2,8 +2,12 @@
Ported to Python 3. Ported to Python 3.
""" """
from __future__ import annotations
import struct import struct
from io import BytesIO from io import BytesIO
from attrs import define, field
from zope.interface import implementer from zope.interface import implementer
from twisted.internet import defer from twisted.internet import defer
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \ from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
@ -100,6 +104,43 @@ def make_write_bucket_proxy(rref, server,
num_share_hashes, uri_extension_size) num_share_hashes, uri_extension_size)
return wbp return wbp
@define
class _WriteBuffer:
"""
Queue up small writes to be written in a single batched larger write.
"""
_batch_size: int
_to_write : BytesIO = field(factory=BytesIO)
_written_bytes : int = field(default=0)
def queue_write(self, offset: int, data: bytes) -> bool:
"""
Queue a write. If the result is ``False``, no further action is needed
for now. If the result is some ``True``, it's time to call ``flush()``
and do a real write.
Callers of this function are expected to queue the data in order, with
no holes. As such, the offset is technically unnecessary, but is used
to check the inputs. Possibly we should get rid of it.
"""
assert offset == self.get_total_bytes_queued()
self._to_write.write(data)
return len(self._to_write.getbuffer()) >= self._batch_size
def flush(self) -> tuple[int, bytes]:
"""Return offset and data to be written."""
offset = self._written_bytes
data = self._to_write.getvalue()
self._written_bytes += len(data)
self._to_write = BytesIO()
return (offset, data)
def get_total_bytes_queued(self) -> int:
"""Return how many bytes were written or queued in total."""
return self._written_bytes + len(self._to_write.getbuffer())
@implementer(IStorageBucketWriter) @implementer(IStorageBucketWriter)
class WriteBucketProxy(object): class WriteBucketProxy(object):
fieldsize = 4 fieldsize = 4
@ -112,8 +153,6 @@ class WriteBucketProxy(object):
self._data_size = data_size self._data_size = data_size
self._block_size = block_size self._block_size = block_size
self._num_segments = num_segments self._num_segments = num_segments
self._written_bytes = 0
self._to_write = BytesIO()
effective_segments = mathutil.next_power_of_k(num_segments,2) effective_segments = mathutil.next_power_of_k(num_segments,2)
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
@ -130,7 +169,7 @@ class WriteBucketProxy(object):
# as of 2022 (standard Comcast). For further discussion of how one # as of 2022 (standard Comcast). For further discussion of how one
# might set batch sizes see # might set batch sizes see
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3787#comment:1. # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3787#comment:1.
self._batch_size = batch_size self._write_buffer = _WriteBuffer(batch_size)
def get_allocated_size(self): def get_allocated_size(self):
return (self._offsets['uri_extension'] + self.fieldsize + return (self._offsets['uri_extension'] + self.fieldsize +
@ -251,25 +290,19 @@ class WriteBucketProxy(object):
no holes. As such, the offset is technically unnecessary, but is used no holes. As such, the offset is technically unnecessary, but is used
to check the inputs. Possibly we should get rid of it. to check the inputs. Possibly we should get rid of it.
""" """
queued_size = len(self._to_write.getbuffer()) if self._write_buffer.queue_write(offset, data):
assert offset == self._written_bytes + queued_size
self._to_write.write(data)
if queued_size + len(data) >= self._batch_size:
return self._actually_write() return self._actually_write()
else: else:
return defer.succeed(False) return defer.succeed(False)
def _actually_write(self): def _actually_write(self):
"""Actually write data to the server.""" """Write data to the server."""
offset = self._written_bytes offset, data = self._write_buffer.flush()
data = self._to_write.getvalue()
self._written_bytes += len(data)
self._to_write = BytesIO()
return self._rref.callRemote("write", offset, data) return self._rref.callRemote("write", offset, data)
def close(self): def close(self):
assert self._written_bytes + len(self._to_write.getbuffer()) == self.get_allocated_size(), ( assert self._write_buffer.get_total_bytes_queued() == self.get_allocated_size(), (
f"{self._written_bytes} + {len(self._to_write.getbuffer())} != {self.get_allocated_size()}" f"{self._written_buffer.get_total_bytes_queued()} != {self.get_allocated_size()}"
) )
d = self._actually_write() d = self._actually_write()
d.addCallback(lambda _: self._rref.callRemote("close")) d.addCallback(lambda _: self._rref.callRemote("close"))
@ -384,16 +417,16 @@ class ReadBucketProxy(object):
self._fieldsize = fieldsize self._fieldsize = fieldsize
self._fieldstruct = fieldstruct self._fieldstruct = fieldstruct
for field in ( 'data', for field_name in ( 'data',
'plaintext_hash_tree', # UNUSED 'plaintext_hash_tree', # UNUSED
'crypttext_hash_tree', 'crypttext_hash_tree',
'block_hashes', 'block_hashes',
'share_hashes', 'share_hashes',
'uri_extension', 'uri_extension',
): ):
offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0] offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0]
x += fieldsize x += fieldsize
self._offsets[field] = offset self._offsets[field_name] = offset
return self._offsets return self._offsets
def _get_block_data(self, unused, blocknum, blocksize, thisblocksize): def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):