Refactor to use BytesIO.

This commit is contained in:
Itamar Turner-Trauring 2022-11-22 14:22:54 -05:00
parent a4787ca45e
commit f638aec0af

View File

@ -1,21 +1,14 @@
""" """
Ported to Python 3. Ported to Python 3.
""" """
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import struct import struct
from io import BytesIO
from zope.interface import implementer from zope.interface import implementer
from twisted.internet import defer from twisted.internet import defer
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \ from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
FileTooLargeError, HASH_SIZE FileTooLargeError, HASH_SIZE
from allmydata.util import mathutil, observer, pipeline, log from allmydata.util import mathutil, observer, log
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.storage.server import si_b2a from allmydata.storage.server import si_b2a
@ -120,7 +113,7 @@ class WriteBucketProxy(object):
self._block_size = block_size self._block_size = block_size
self._num_segments = num_segments self._num_segments = num_segments
self._written_bytes = 0 self._written_bytes = 0
self._to_write = b"" self._to_write = BytesIO()
effective_segments = mathutil.next_power_of_k(num_segments,2) effective_segments = mathutil.next_power_of_k(num_segments,2)
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
@ -258,9 +251,10 @@ class WriteBucketProxy(object):
no holes. As such, the offset is technically unnecessary, but is used no holes. As such, the offset is technically unnecessary, but is used
to check the inputs. Possibly we should get rid of it. to check the inputs. Possibly we should get rid of it.
""" """
assert offset == self._written_bytes + len(self._to_write) queued_size = len(self._to_write.getbuffer())
self._to_write += data assert offset == self._written_bytes + queued_size
if len(self._to_write) >= self._batch_size: self._to_write.write(data)
if queued_size + len(data) >= self._batch_size:
return self._actually_write() return self._actually_write()
else: else:
return defer.succeed(False) return defer.succeed(False)
@ -268,14 +262,14 @@ class WriteBucketProxy(object):
def _actually_write(self): def _actually_write(self):
"""Actually write data to the server.""" """Actually write data to the server."""
offset = self._written_bytes offset = self._written_bytes
data = self._to_write data = self._to_write.getvalue()
self._written_bytes += len(self._to_write) self._written_bytes += len(data)
self._to_write = b"" self._to_write = BytesIO()
return self._rref.callRemote("write", offset, data) return self._rref.callRemote("write", offset, data)
def close(self): def close(self):
assert self._written_bytes + len(self._to_write) == self.get_allocated_size(), ( assert self._written_bytes + len(self._to_write.getbuffer()) == self.get_allocated_size(), (
f"{self._written_bytes} + {len(self._to_write)} != {self.get_allocated_size()}" f"{self._written_bytes} + {len(self._to_write.getbuffer())} != {self.get_allocated_size()}"
) )
d = self._actually_write() d = self._actually_write()
d.addCallback(lambda _: self._rref.callRemote("close")) d.addCallback(lambda _: self._rref.callRemote("close"))