mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-19 11:16:24 +00:00
parent
550d67f51f
commit
3668cb3d06
@ -123,9 +123,8 @@ class Share:
|
||||
# use the upload-side code to get this as accurate as possible
|
||||
ht = IncompleteHashTree(N)
|
||||
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
|
||||
wbp = make_write_bucket_proxy(None, share_size, r["block_size"],
|
||||
r["num_segments"], num_share_hashes, 0,
|
||||
None)
|
||||
wbp = make_write_bucket_proxy(None, None, share_size, r["block_size"],
|
||||
r["num_segments"], num_share_hashes, 0)
|
||||
self._fieldsize = wbp.fieldsize
|
||||
self._fieldstruct = wbp.fieldstruct
|
||||
self.guessed_offsets = wbp._offsets
|
||||
|
@ -76,19 +76,22 @@ limitations described in #346.
|
||||
|
||||
FORCE_V2 = False # set briefly by unit tests to make small-sized V2 shares
|
||||
|
||||
def make_write_bucket_proxy(rref, data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max, nodeid):
|
||||
def make_write_bucket_proxy(rref, server,
|
||||
data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max):
|
||||
# Use layout v1 for small files, so they'll be readable by older versions
|
||||
# (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
|
||||
# by tahoe-1.3.0 or later.
|
||||
try:
|
||||
if FORCE_V2:
|
||||
raise FileTooLargeError
|
||||
wbp = WriteBucketProxy(rref, data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max, nodeid)
|
||||
wbp = WriteBucketProxy(rref, server,
|
||||
data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max)
|
||||
except FileTooLargeError:
|
||||
wbp = WriteBucketProxy_v2(rref, data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max, nodeid)
|
||||
wbp = WriteBucketProxy_v2(rref, server,
|
||||
data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max)
|
||||
return wbp
|
||||
|
||||
class WriteBucketProxy:
|
||||
@ -96,14 +99,13 @@ class WriteBucketProxy:
|
||||
fieldsize = 4
|
||||
fieldstruct = ">L"
|
||||
|
||||
def __init__(self, rref, data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max, nodeid,
|
||||
pipeline_size=50000):
|
||||
def __init__(self, rref, server, data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max, pipeline_size=50000):
|
||||
self._rref = rref
|
||||
self._server = server
|
||||
self._data_size = data_size
|
||||
self._block_size = block_size
|
||||
self._num_segments = num_segments
|
||||
self._nodeid = nodeid
|
||||
|
||||
effective_segments = mathutil.next_power_of_k(num_segments,2)
|
||||
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
|
||||
@ -161,11 +163,7 @@ class WriteBucketProxy:
|
||||
self._offset_data = offset_data
|
||||
|
||||
def __repr__(self):
|
||||
if self._nodeid:
|
||||
nodeid_s = idlib.nodeid_b2a(self._nodeid)
|
||||
else:
|
||||
nodeid_s = "[None]"
|
||||
return "<WriteBucketProxy for node %s>" % nodeid_s
|
||||
return "<WriteBucketProxy for node %s>" % self._server.get_name()
|
||||
|
||||
def put_header(self):
|
||||
return self._write(0, self._offset_data)
|
||||
@ -247,10 +245,10 @@ class WriteBucketProxy:
|
||||
return self._rref.callRemoteOnly("abort")
|
||||
|
||||
|
||||
def get_servername(self):
|
||||
return self._server.get_name()
|
||||
def get_peerid(self):
|
||||
if self._nodeid:
|
||||
return self._nodeid
|
||||
return None
|
||||
return self._server.get_serverid()
|
||||
|
||||
class WriteBucketProxy_v2(WriteBucketProxy):
|
||||
fieldsize = 8
|
||||
|
@ -77,10 +77,10 @@ class ServerTracker:
|
||||
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
|
||||
self.sharesize = sharesize
|
||||
|
||||
wbp = layout.make_write_bucket_proxy(None, sharesize,
|
||||
wbp = layout.make_write_bucket_proxy(None, None, sharesize,
|
||||
blocksize, num_segments,
|
||||
num_share_hashes,
|
||||
EXTENSION_SIZE, server.get_serverid())
|
||||
EXTENSION_SIZE)
|
||||
self.wbp_class = wbp.__class__ # to create more of them
|
||||
self.allocated_size = wbp.get_allocated_size()
|
||||
self.blocksize = blocksize
|
||||
@ -120,12 +120,11 @@ class ServerTracker:
|
||||
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
|
||||
b = {}
|
||||
for sharenum, rref in buckets.iteritems():
|
||||
bp = self.wbp_class(rref, self.sharesize,
|
||||
bp = self.wbp_class(rref, self._server, self.sharesize,
|
||||
self.blocksize,
|
||||
self.num_segments,
|
||||
self.num_share_hashes,
|
||||
EXTENSION_SIZE,
|
||||
self._server.get_serverid())
|
||||
EXTENSION_SIZE)
|
||||
b[sharenum] = bp
|
||||
self.buckets.update(b)
|
||||
return (alreadygot, set(b.keys()))
|
||||
@ -149,7 +148,7 @@ class ServerTracker:
|
||||
|
||||
|
||||
def str_shareloc(shnum, bucketwriter):
|
||||
return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
|
||||
return "%s: %s" % (shnum, bucketwriter.get_servername(),)
|
||||
|
||||
class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
|
||||
@ -205,9 +204,9 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
|
||||
|
||||
# figure out how much space to ask for
|
||||
wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
|
||||
num_share_hashes, EXTENSION_SIZE,
|
||||
None)
|
||||
wbp = layout.make_write_bucket_proxy(None, None,
|
||||
share_size, 0, num_segments,
|
||||
num_share_hashes, EXTENSION_SIZE)
|
||||
allocated_size = wbp.get_allocated_size()
|
||||
all_servers = storage_broker.get_servers_for_psi(storage_index)
|
||||
if not all_servers:
|
||||
|
@ -136,12 +136,12 @@ class BucketProxy(unittest.TestCase):
|
||||
|
||||
def test_create(self):
|
||||
bw, rb, sharefname = self.make_bucket("test_create", 500)
|
||||
bp = WriteBucketProxy(rb,
|
||||
bp = WriteBucketProxy(rb, None,
|
||||
data_size=300,
|
||||
block_size=10,
|
||||
num_segments=5,
|
||||
num_share_hashes=3,
|
||||
uri_extension_size_max=500, nodeid=None)
|
||||
uri_extension_size_max=500)
|
||||
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
|
||||
|
||||
def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
|
||||
@ -167,13 +167,12 @@ class BucketProxy(unittest.TestCase):
|
||||
uri_extension = "s" + "E"*498 + "e"
|
||||
|
||||
bw, rb, sharefname = self.make_bucket(name, sharesize)
|
||||
bp = wbp_class(rb,
|
||||
bp = wbp_class(rb, None,
|
||||
data_size=95,
|
||||
block_size=25,
|
||||
num_segments=4,
|
||||
num_share_hashes=3,
|
||||
uri_extension_size_max=len(uri_extension),
|
||||
nodeid=None)
|
||||
uri_extension_size_max=len(uri_extension))
|
||||
|
||||
d = bp.put_header()
|
||||
d.addCallback(lambda res: bp.put_block(0, "a"*25))
|
||||
|
Loading…
Reference in New Issue
Block a user