mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-19 13:07:56 +00:00
storage: add readv_slots: get data from all shares
This commit is contained in:
parent
6137329f05
commit
8f21424449
@ -89,6 +89,7 @@ TestVector = ListOf(TupleOf(int, int, str, str))
|
||||
# you should use length==len(specimen) for everything except nop
|
||||
DataVector = ListOf(TupleOf(int, ShareData))
|
||||
# (offset, data). This limits us to 30 writes of 1MiB each per call
|
||||
ReadVector = ListOf(TupleOf(int, int))
|
||||
TestResults = ListOf(str)
|
||||
# returns data[offset:offset+length] for each element of TestVector
|
||||
|
||||
@ -209,6 +210,10 @@ class RIStorageServer(RemoteInterface):
|
||||
of the slot mentioned."""
|
||||
return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS)
|
||||
|
||||
def readv_slots(storage_index=StorageIndex, readv=ReadVector):
|
||||
"""Read a vector from all shares associated with the given storage
|
||||
index. Returns a dictionary with one key per share."""
|
||||
return DictOf(int, DataVector) # shnum -> results
|
||||
|
||||
class IStorageBucketWriter(Interface):
|
||||
def put_block(segmentnum=int, data=ShareData):
|
||||
|
@ -554,6 +554,14 @@ class MutableShareFile(Referenceable):
|
||||
f.close()
|
||||
return data
|
||||
|
||||
def readv(self, readv):
|
||||
datav = []
|
||||
f = open(self.home, 'rb')
|
||||
for (offset, length) in readv:
|
||||
datav.append(self._read_share_data(f, offset, length))
|
||||
f.close()
|
||||
return datav
|
||||
|
||||
def remote_get_length(self):
|
||||
f = open(self.home, 'rb')
|
||||
data_length = self._read_data_length(f)
|
||||
@ -878,6 +886,24 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
slots[sharenum] = MutableShareFile(filename)
|
||||
return slots
|
||||
|
||||
def remote_readv_slots(self, storage_index, readv):
|
||||
si_s = idlib.b2a(storage_index)
|
||||
# shares exist if there is a file for them
|
||||
bucketdir = os.path.join(self.sharedir, si_s)
|
||||
if not os.path.isdir(bucketdir):
|
||||
return {}
|
||||
datavs = {}
|
||||
for sharenum_s in os.listdir(bucketdir):
|
||||
try:
|
||||
sharenum = int(sharenum_s)
|
||||
except ValueError:
|
||||
continue
|
||||
filename = os.path.join(bucketdir, sharenum_s)
|
||||
msf = MutableShareFile(filename)
|
||||
datavs[sharenum] = msf.readv(readv)
|
||||
return datavs
|
||||
|
||||
|
||||
|
||||
# the code before here runs on the storage server, not the client
|
||||
# the code beyond here runs on the client, not the storage server
|
||||
|
@ -687,6 +687,21 @@ class MutableServer(unittest.TestCase):
|
||||
self.failUnlessEqual(s0.remote_read(0, 100), data)
|
||||
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
|
||||
|
||||
def test_readv(self):
|
||||
ss = self.create("test_allocate")
|
||||
shares = self.allocate(ss, "si1", "we1", self._secret.next(),
|
||||
set([0,1,2]), 100)
|
||||
WE = self.write_enabler("we1")
|
||||
data = [("%d" % i) * 100 for i in range(3)]
|
||||
for i in range(3):
|
||||
rc = shares[i].remote_testv_and_writev(WE, [], [(0, data[i])],
|
||||
new_length=None)
|
||||
self.failUnlessEqual(rc, (True, []))
|
||||
answer = ss.remote_readv_slots("si1", [(0, 10)])
|
||||
self.failUnlessEqual(answer, {0: ["0"*10],
|
||||
1: ["1"*10],
|
||||
2: ["2"*10]})
|
||||
|
||||
def compare_leases_without_timestamps(self, a, b):
|
||||
self.failUnlessEqual(len(a), len(b))
|
||||
for i in range(len(a)):
|
||||
|
Loading…
Reference in New Issue
Block a user