immutable WriteBucketProxy: use pipeline to speed up uploads by overlapping roundtrips, for #392

This commit is contained in:
Brian Warner 2009-05-18 16:44:22 -07:00
parent e76c6b606f
commit 79437baade

View File

@ -3,7 +3,7 @@ from zope.interface import implements
from twisted.internet import defer
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
FileTooLargeError, HASH_SIZE
from allmydata.util import mathutil, idlib, observer
from allmydata.util import mathutil, idlib, observer, pipeline
from allmydata.util.assertutil import precondition
from allmydata.storage.server import si_b2a
@ -93,7 +93,8 @@ class WriteBucketProxy:
fieldstruct = ">L"
def __init__(self, rref, data_size, block_size, num_segments,
num_share_hashes, uri_extension_size_max, nodeid):
num_share_hashes, uri_extension_size_max, nodeid,
pipeline_size=50000):
self._rref = rref
self._data_size = data_size
self._block_size = block_size
@ -110,6 +111,12 @@ class WriteBucketProxy:
self._create_offsets(block_size, data_size)
# k=3, max_segment_size=128KiB gives us a typical segment of 43691
# bytes. Setting the default pipeline_size to 50KB lets us get two
# segments onto the wire but not a third, which would keep the pipe
# filled.
self._pipeline = pipeline.Pipeline(pipeline_size)
def get_allocated_size(self):
return (self._offsets['uri_extension'] + self.fieldsize +
self._uri_extension_size_max)
@ -218,11 +225,19 @@ class WriteBucketProxy:
return self._write(offset, length+data)
def _write(self, offset, data):
# TODO: for small shares, buffer the writes and do just a single call
return self._rref.callRemote("write", offset, data)
# use a Pipeline to pipeline several writes together. TODO: another
# speedup would be to coalesce small writes into a single call: this
# would reduce the foolscap CPU overhead per share, but wouldn't
# reduce the number of round trips, so it might not be worth the
# effort.
return self._pipeline.add(len(data),
self._rref.callRemote, "write", offset, data)
def close(self):
return self._rref.callRemote("close")
d = self._pipeline.add(0, self._rref.callRemote, "close")
d.addCallback(lambda ign: self._pipeline.flush())
return d
def abort(self):
return self._rref.callRemoteOnly("abort")