mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-21 10:01:54 +00:00
mutable: added send-messages-to-peers code, about 70% done. No recovery code yet.
This commit is contained in:
parent
e08b091d9f
commit
fade06ef4d
@ -307,6 +307,12 @@ class Publish(ShareFormattingMixin):
|
|||||||
return (seqnum, root_hash, final_shares)
|
return (seqnum, root_hash, final_shares)
|
||||||
|
|
||||||
|
|
||||||
|
def _pack_checkstring(self, seqnum, root_hash):
|
||||||
|
return struct.pack(">BQ32s",
|
||||||
|
0, # version,
|
||||||
|
seqnum,
|
||||||
|
root_hash)
|
||||||
|
|
||||||
def _pack_prefix(self, seqnum, root_hash,
|
def _pack_prefix(self, seqnum, root_hash,
|
||||||
required_shares, total_shares,
|
required_shares, total_shares,
|
||||||
segment_size, data_length):
|
segment_size, data_length):
|
||||||
@ -419,7 +425,7 @@ class Publish(ShareFormattingMixin):
|
|||||||
(peerid, seqnum, R) = oldplace
|
(peerid, seqnum, R) = oldplace
|
||||||
if seqnum >= new_seqnum:
|
if seqnum >= new_seqnum:
|
||||||
raise UncoordinatedWriteError()
|
raise UncoordinatedWriteError()
|
||||||
target_map[shnum].add(oldplace)
|
target_map.add(shnum, oldplace)
|
||||||
shares_per_peer.add(peerid, shnum)
|
shares_per_peer.add(peerid, shnum)
|
||||||
if shnum in shares_needing_homes:
|
if shnum in shares_needing_homes:
|
||||||
shares_needing_homes.remove(shnum)
|
shares_needing_homes.remove(shnum)
|
||||||
@ -444,4 +450,83 @@ class Publish(ShareFormattingMixin):
|
|||||||
|
|
||||||
assert not shares_needing_homes
|
assert not shares_needing_homes
|
||||||
|
|
||||||
return target_map
|
return (target_map, peer_storage_servers)
|
||||||
|
|
||||||
|
def _send_shares(self, (target_map, peer_storage_servers) ):
|
||||||
|
# we're finally ready to send out our shares. If we encounter any
|
||||||
|
# surprises here, it's because somebody else is writing at the same
|
||||||
|
# time. (Note: in the future, when we remove the _query_peers() step
|
||||||
|
# and instead speculate about [or remember] which shares are where,
|
||||||
|
# surprises here are *not* indications of UncoordinatedWriteError,
|
||||||
|
# and we'll need to respond to them more gracefully.
|
||||||
|
|
||||||
|
my_checkstring = self._pack_checkstring(self._new_seqnum,
|
||||||
|
self._new_root_hash)
|
||||||
|
peer_messages = {}
|
||||||
|
expected_old_shares = {}
|
||||||
|
|
||||||
|
for shnum, peers in target_map.items():
|
||||||
|
for (peerid, old_seqnum, old_root_hash) in peers:
|
||||||
|
testv = [(0, len(my_checkstring), "ge", my_checkstring)]
|
||||||
|
new_share = self._new_shares[shnum]
|
||||||
|
writev = [(0, new_share)]
|
||||||
|
if peerid not in peer_messages:
|
||||||
|
peer_messages[peerid] = {}
|
||||||
|
peer_messages[peerid][shnum] = (testv, writev, None)
|
||||||
|
if peerid not in expected_old_shares:
|
||||||
|
expected_old_shares[peerid] = {}
|
||||||
|
expected_old_shares[peerid][shnum] = (old_seqnum, old_root_hash)
|
||||||
|
|
||||||
|
read_vector = [(0, len(my_checkstring))]
|
||||||
|
|
||||||
|
dl = []
|
||||||
|
# ok, send the messages!
|
||||||
|
self._surprised = False
|
||||||
|
for peerid, tw_vectors in peer_messages.items():
|
||||||
|
d = self._do_testreadwrite(peerid, peer_storage_servers,
|
||||||
|
tw_vectors, read_vector)
|
||||||
|
d.addCallback(self._got_write_answer,
|
||||||
|
peerid, expected_old_shares[peerid])
|
||||||
|
dl.append(d)
|
||||||
|
|
||||||
|
d = defer.DeferredList(dl)
|
||||||
|
d.addCallback(lambda res: self._surprised)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def _do_testreadwrite(self, peerid, peer_storage_servers,
|
||||||
|
tw_vectors, read_vector):
|
||||||
|
conn = peer_storage_servers[peerid]
|
||||||
|
storage_index = self._node._uri.storage_index
|
||||||
|
# TOTALLY BOGUS renew/cancel secrets
|
||||||
|
write_enabler = hashutil.tagged_hash("WEFOO", storage_index)
|
||||||
|
renew_secret = hashutil.tagged_hash("renewFOO", storage_index)
|
||||||
|
cancel_secret = hashutil.tagged_hash("cancelFOO", storage_index)
|
||||||
|
|
||||||
|
d = conn.callRemote("slot_testv_and_readv_and_writev",
|
||||||
|
storage_index,
|
||||||
|
(write_enabler, renew_secret, cancel_secret),
|
||||||
|
tw_vectors,
|
||||||
|
read_vector)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def _got_write_answer(self, answer, peerid, expected_old_shares):
|
||||||
|
wrote, read_data = answer
|
||||||
|
surprised = False
|
||||||
|
if not wrote:
|
||||||
|
# surprise! our testv failed, so the write did not happen
|
||||||
|
surprised = True
|
||||||
|
for shnum, (old_checkstring,) in read_data.items():
|
||||||
|
if shnum not in expected_old_shares:
|
||||||
|
# surprise! there was a share we didn't know about
|
||||||
|
surprised = True
|
||||||
|
else:
|
||||||
|
seqnum, root_hash = expected_old_shares[shnum]
|
||||||
|
if seqnum is not None:
|
||||||
|
expected_checkstring = self._pack_checkstring(seqnum,
|
||||||
|
root_hash)
|
||||||
|
if old_checkstring != expected_checkstring:
|
||||||
|
# surprise! somebody modified the share
|
||||||
|
surprised = True
|
||||||
|
if surprised:
|
||||||
|
self._surprised = True
|
||||||
|
|
||||||
|
@ -72,6 +72,18 @@ class FakePublish(mutable.Publish):
|
|||||||
shares = self._peers[peerid]
|
shares = self._peers[peerid]
|
||||||
return defer.succeed(shares)
|
return defer.succeed(shares)
|
||||||
|
|
||||||
|
def _do_testreadwrite(self, conn, peerid, tw_vectors, read_vector):
|
||||||
|
# always-pass: parrot the test vectors back to them.
|
||||||
|
readv = {}
|
||||||
|
for shnum, (testv, datav, new_length) in tw_vectors.items():
|
||||||
|
for (offset, length, op, specimen) in testv:
|
||||||
|
assert op in ("le", "eq", "ge")
|
||||||
|
readv[shnum] = [ specimen
|
||||||
|
for (offset, length, op, specimen)
|
||||||
|
in testv ]
|
||||||
|
answer = (True, readv)
|
||||||
|
return defer.succeed(answer)
|
||||||
|
|
||||||
|
|
||||||
class FakeNewDirectoryNode(dirnode2.NewDirectoryNode):
|
class FakeNewDirectoryNode(dirnode2.NewDirectoryNode):
|
||||||
filenode_class = FakeFilenode
|
filenode_class = FakeFilenode
|
||||||
@ -268,7 +280,7 @@ class Publish(unittest.TestCase):
|
|||||||
total_shares = 10
|
total_shares = 10
|
||||||
d = p._query_peers( (new_seqnum, new_root_hash, new_seqnum),
|
d = p._query_peers( (new_seqnum, new_root_hash, new_seqnum),
|
||||||
total_shares)
|
total_shares)
|
||||||
def _done(target_map):
|
def _done( (target_map, peer_storage_servers) ):
|
||||||
shares_per_peer = {}
|
shares_per_peer = {}
|
||||||
for shnum in target_map:
|
for shnum in target_map:
|
||||||
for (peerid, old_seqnum, old_R) in target_map[shnum]:
|
for (peerid, old_seqnum, old_R) in target_map[shnum]:
|
||||||
@ -293,7 +305,7 @@ class Publish(unittest.TestCase):
|
|||||||
total_shares = 10
|
total_shares = 10
|
||||||
d = p._query_peers( (new_seqnum, new_root_hash, new_seqnum),
|
d = p._query_peers( (new_seqnum, new_root_hash, new_seqnum),
|
||||||
total_shares)
|
total_shares)
|
||||||
def _done(target_map):
|
def _done( (target_map, peer_storage_servers) ):
|
||||||
shares_per_peer = {}
|
shares_per_peer = {}
|
||||||
for shnum in target_map:
|
for shnum in target_map:
|
||||||
for (peerid, old_seqnum, old_R) in target_map[shnum]:
|
for (peerid, old_seqnum, old_R) in target_map[shnum]:
|
||||||
@ -320,6 +332,33 @@ class Publish(unittest.TestCase):
|
|||||||
total_shares)
|
total_shares)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
def setup_for_write(self, num_peers, total_shares):
|
||||||
|
c, p = self.setup_for_sharemap(num_peers)
|
||||||
|
# make some fake shares
|
||||||
|
CONTENTS = "some initial contents"
|
||||||
|
shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
|
||||||
|
d = defer.maybeDeferred(p._generate_shares,
|
||||||
|
(shares_and_ids,
|
||||||
|
3, total_shares,
|
||||||
|
21, # segsize
|
||||||
|
len(CONTENTS),
|
||||||
|
"IV"*8),
|
||||||
|
3, # seqnum
|
||||||
|
FakePrivKey(), "encprivkey", FakePubKey(),
|
||||||
|
)
|
||||||
|
return d, p
|
||||||
|
|
||||||
|
def test_write(self):
|
||||||
|
total_shares = 10
|
||||||
|
d, p = self.setup_for_write(20, total_shares)
|
||||||
|
d.addCallback(p._query_peers, total_shares)
|
||||||
|
d.addCallback(p._send_shares)
|
||||||
|
def _done(surprised):
|
||||||
|
self.failIf(surprised, "surprised!")
|
||||||
|
d.addCallback(_done)
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class FakePubKey:
|
class FakePubKey:
|
||||||
def serialize(self):
|
def serialize(self):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user