diff --git a/src/allmydata/mutable/layout.py b/src/allmydata/mutable/layout.py index 9565843f6..02069edd1 100644 --- a/src/allmydata/mutable/layout.py +++ b/src/allmydata/mutable/layout.py @@ -1,13 +1,79 @@ -import struct +import struct, math from allmydata.mutable.common import NeedMoreDataError, UnknownVersionError +from allmydata.interfaces import HASH_SIZE, SALT_SIZE, SDMF_VERSION, \ + MDMF_VERSION, IMutableSlotWriter +from allmydata.util import mathutil, observer +from twisted.python import failure +from twisted.internet import defer +from zope.interface import implements + + +# These strings describe the format of the packed structs they help process +# Here's what they mean: +# +# PREFIX: +# >: Big-endian byte order; the most significant byte is first (leftmost). +# B: The version information; an 8 bit version identifier. Stored as +# an unsigned char. This is currently 00 00 00 00; our modifications +# will turn it into 00 00 00 01. +# Q: The sequence number; this is sort of like a revision history for +# mutable files; they start at 1 and increase as they are changed after +# being uploaded. Stored as an unsigned long long, which is 8 bytes in +# length. +# 32s: The root hash of the share hash tree. We use sha-256d, so we use 32 +# characters = 32 bytes to store the value. +# 16s: The salt for the readkey. This is a 16-byte random value, stored as +# 16 characters. +# +# SIGNED_PREFIX additions, things that are covered by the signature: +# B: The "k" encoding parameter. We store this as an 8-bit character, +# which is convenient because our erasure coding scheme cannot +# encode if you ask for more than 255 pieces. +# B: The "N" encoding parameter. Stored as an 8-bit character for the +# same reasons as above. +# Q: The segment size of the uploaded file. This will essentially be the +# length of the file in SDMF. An unsigned long long, so we can store +# files of quite large size. +# Q: The data length of the uploaded file. Modulo padding, this will be +# the same of the data length field. Like the data length field, it is +# an unsigned long long and can be quite large. +# +# HEADER additions: +# L: The offset of the signature of this. An unsigned long. +# L: The offset of the share hash chain. An unsigned long. +# L: The offset of the block hash tree. An unsigned long. +# L: The offset of the share data. An unsigned long. +# Q: The offset of the encrypted private key. An unsigned long long, to +# account for the possibility of a lot of share data. +# Q: The offset of the EOF. An unsigned long long, to account for the +# possibility of a lot of share data. +# +# After all of these, we have the following: +# - The verification key: Occupies the space between the end of the header +# and the start of the signature (i.e.: data[HEADER_LENGTH:o['signature']]. +# - The signature, which goes from the signature offset to the share hash +# chain offset. +# - The share hash chain, which goes from the share hash chain offset to +# the block hash tree offset. +# - The share data, which goes from the share data offset to the encrypted +# private key offset. +# - The encrypted private key offset, which goes until the end of the file. +# +# The block hash tree in this encoding has only one share, so the offset of +# the share data will be 32 bits more than the offset of the block hash tree. +# Given this, we may need to check to see how many bytes a reasonably sized +# block hash tree will take up. PREFIX = ">BQ32s16s" # each version has a different prefix SIGNED_PREFIX = ">BQ32s16s BBQQ" # this is covered by the signature SIGNED_PREFIX_LENGTH = struct.calcsize(SIGNED_PREFIX) HEADER = ">BQ32s16s BBQQ LLLLQQ" # includes offsets HEADER_LENGTH = struct.calcsize(HEADER) +OFFSETS = ">LLLLQQ" +OFFSETS_LENGTH = struct.calcsize(OFFSETS) +# These are still used for some tests. def unpack_header(data): o = {} (version, @@ -23,30 +89,6 @@ def unpack_header(data): o['EOF']) = struct.unpack(HEADER, data[:HEADER_LENGTH]) return (version, seqnum, root_hash, IV, k, N, segsize, datalen, o) -def unpack_prefix_and_signature(data): - assert len(data) >= HEADER_LENGTH, len(data) - prefix = data[:SIGNED_PREFIX_LENGTH] - - (version, - seqnum, - root_hash, - IV, - k, N, segsize, datalen, - o) = unpack_header(data) - - if version != 0: - raise UnknownVersionError("got mutable share version %d, but I only understand version 0" % version) - - if len(data) < o['share_hash_chain']: - raise NeedMoreDataError(o['share_hash_chain'], - o['enc_privkey'], o['EOF']-o['enc_privkey']) - - pubkey_s = data[HEADER_LENGTH:o['signature']] - signature = data[o['signature']:o['share_hash_chain']] - - return (seqnum, root_hash, IV, k, N, segsize, datalen, - pubkey_s, signature, prefix) - def unpack_share(data): assert len(data) >= HEADER_LENGTH o = {} @@ -94,45 +136,6 @@ def unpack_share(data): pubkey, signature, share_hash_chain, block_hash_tree, share_data, enc_privkey) -def unpack_share_data(verinfo, hash_and_data): - (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, o_t) = verinfo - - # hash_and_data starts with the share_hash_chain, so figure out what the - # offsets really are - o = dict(o_t) - o_share_hash_chain = 0 - o_block_hash_tree = o['block_hash_tree'] - o['share_hash_chain'] - o_share_data = o['share_data'] - o['share_hash_chain'] - o_enc_privkey = o['enc_privkey'] - o['share_hash_chain'] - - share_hash_chain_s = hash_and_data[o_share_hash_chain:o_block_hash_tree] - share_hash_format = ">H32s" - hsize = struct.calcsize(share_hash_format) - assert len(share_hash_chain_s) % hsize == 0, len(share_hash_chain_s) - share_hash_chain = [] - for i in range(0, len(share_hash_chain_s), hsize): - chunk = share_hash_chain_s[i:i+hsize] - (hid, h) = struct.unpack(share_hash_format, chunk) - share_hash_chain.append( (hid, h) ) - share_hash_chain = dict(share_hash_chain) - block_hash_tree_s = hash_and_data[o_block_hash_tree:o_share_data] - assert len(block_hash_tree_s) % 32 == 0, len(block_hash_tree_s) - block_hash_tree = [] - for i in range(0, len(block_hash_tree_s), 32): - block_hash_tree.append(block_hash_tree_s[i:i+32]) - - share_data = hash_and_data[o_share_data:o_enc_privkey] - - return (share_hash_chain, block_hash_tree, share_data) - - -def pack_checkstring(seqnum, root_hash, IV): - return struct.pack(PREFIX, - 0, # version, - seqnum, - root_hash, - IV) - def unpack_checkstring(checkstring): cs_len = struct.calcsize(PREFIX) version, seqnum, root_hash, IV = struct.unpack(PREFIX, checkstring[:cs_len]) @@ -140,21 +143,6 @@ def unpack_checkstring(checkstring): raise UnknownVersionError("got mutable share version %d, but I only understand version 0" % version) return (seqnum, root_hash, IV) -def pack_prefix(seqnum, root_hash, IV, - required_shares, total_shares, - segment_size, data_length): - prefix = struct.pack(SIGNED_PREFIX, - 0, # version, - seqnum, - root_hash, - IV, - - required_shares, - total_shares, - segment_size, - data_length, - ) - return prefix def pack_offsets(verification_key_length, signature_length, share_hash_chain_length, block_hash_tree_length, @@ -201,3 +189,1581 @@ def pack_share(prefix, verification_key, signature, encprivkey]) return final_share +def pack_prefix(seqnum, root_hash, IV, + required_shares, total_shares, + segment_size, data_length): + prefix = struct.pack(SIGNED_PREFIX, + 0, # version, + seqnum, + root_hash, + IV, + required_shares, + total_shares, + segment_size, + data_length, + ) + return prefix + + +class SDMFSlotWriteProxy: + implements(IMutableSlotWriter) + """ + I represent a remote write slot for an SDMF mutable file. I build a + share in memory, and then write it in one piece to the remote + server. This mimics how SDMF shares were built before MDMF (and the + new MDMF uploader), but provides that functionality in a way that + allows the MDMF uploader to be built without much special-casing for + file format, which makes the uploader code more readable. + """ + def __init__(self, + shnum, + rref, # a remote reference to a storage server + storage_index, + secrets, # (write_enabler, renew_secret, cancel_secret) + seqnum, # the sequence number of the mutable file + required_shares, + total_shares, + segment_size, + data_length): # the length of the original file + self.shnum = shnum + self._rref = rref + self._storage_index = storage_index + self._secrets = secrets + self._seqnum = seqnum + self._required_shares = required_shares + self._total_shares = total_shares + self._segment_size = segment_size + self._data_length = data_length + + # This is an SDMF file, so it should have only one segment, so, + # modulo padding of the data length, the segment size and the + # data length should be the same. + expected_segment_size = mathutil.next_multiple(data_length, + self._required_shares) + assert expected_segment_size == segment_size + + self._block_size = self._segment_size / self._required_shares + + # This is meant to mimic how SDMF files were built before MDMF + # entered the picture: we generate each share in its entirety, + # then push it off to the storage server in one write. When + # callers call set_*, they are just populating this dict. + # finish_publishing will stitch these pieces together into a + # coherent share, and then write the coherent share to the + # storage server. + self._share_pieces = {} + + # This tells the write logic what checkstring to use when + # writing remote shares. + self._testvs = [] + + self._readvs = [(0, struct.calcsize(PREFIX))] + + + def set_checkstring(self, checkstring_or_seqnum, + root_hash=None, + salt=None): + """ + Set the checkstring that I will pass to the remote server when + writing. + + @param checkstring_or_seqnum: A packed checkstring to use, + or a sequence number. I will treat this as a checkstr + + Note that implementations can differ in which semantics they + wish to support for set_checkstring -- they can, for example, + build the checkstring themselves from its constituents, or + some other thing. + """ + if root_hash and salt: + checkstring = struct.pack(PREFIX, + 0, + checkstring_or_seqnum, + root_hash, + salt) + else: + checkstring = checkstring_or_seqnum + self._testvs = [(0, len(checkstring), "eq", checkstring)] + + + def get_checkstring(self): + """ + Get the checkstring that I think currently exists on the remote + server. + """ + if self._testvs: + return self._testvs[0][3] + return "" + + + def put_block(self, data, segnum, salt): + """ + Add a block and salt to the share. + """ + # SDMF files have only one segment + assert segnum == 0 + assert len(data) == self._block_size + assert len(salt) == SALT_SIZE + + self._share_pieces['sharedata'] = data + self._share_pieces['salt'] = salt + + # TODO: Figure out something intelligent to return. + return defer.succeed(None) + + + def put_encprivkey(self, encprivkey): + """ + Add the encrypted private key to the share. + """ + self._share_pieces['encprivkey'] = encprivkey + + return defer.succeed(None) + + + def put_blockhashes(self, blockhashes): + """ + Add the block hash tree to the share. + """ + assert isinstance(blockhashes, list) + for h in blockhashes: + assert len(h) == HASH_SIZE + + # serialize the blockhashes, then set them. + blockhashes_s = "".join(blockhashes) + self._share_pieces['block_hash_tree'] = blockhashes_s + + return defer.succeed(None) + + + def put_sharehashes(self, sharehashes): + """ + Add the share hash chain to the share. + """ + assert isinstance(sharehashes, dict) + for h in sharehashes.itervalues(): + assert len(h) == HASH_SIZE + + # serialize the sharehashes, then set them. + sharehashes_s = "".join([struct.pack(">H32s", i, sharehashes[i]) + for i in sorted(sharehashes.keys())]) + self._share_pieces['share_hash_chain'] = sharehashes_s + + return defer.succeed(None) + + + def put_root_hash(self, root_hash): + """ + Add the root hash to the share. + """ + assert len(root_hash) == HASH_SIZE + + self._share_pieces['root_hash'] = root_hash + + return defer.succeed(None) + + + def put_salt(self, salt): + """ + Add a salt to an empty SDMF file. + """ + assert len(salt) == SALT_SIZE + + self._share_pieces['salt'] = salt + self._share_pieces['sharedata'] = "" + + + def get_signable(self): + """ + Return the part of the share that needs to be signed. + + SDMF writers need to sign the packed representation of the + first eight fields of the remote share, that is: + - version number (0) + - sequence number + - root of the share hash tree + - salt + - k + - n + - segsize + - datalen + + This method is responsible for returning that to callers. + """ + return struct.pack(SIGNED_PREFIX, + 0, + self._seqnum, + self._share_pieces['root_hash'], + self._share_pieces['salt'], + self._required_shares, + self._total_shares, + self._segment_size, + self._data_length) + + + def put_signature(self, signature): + """ + Add the signature to the share. + """ + self._share_pieces['signature'] = signature + + return defer.succeed(None) + + + def put_verification_key(self, verification_key): + """ + Add the verification key to the share. + """ + self._share_pieces['verification_key'] = verification_key + + return defer.succeed(None) + + + def get_verinfo(self): + """ + I return my verinfo tuple. This is used by the ServermapUpdater + to keep track of versions of mutable files. + + The verinfo tuple for MDMF files contains: + - seqnum + - root hash + - a blank (nothing) + - segsize + - datalen + - k + - n + - prefix (the thing that you sign) + - a tuple of offsets + + We include the nonce in MDMF to simplify processing of version + information tuples. + + The verinfo tuple for SDMF files is the same, but contains a + 16-byte IV instead of a hash of salts. + """ + return (self._seqnum, + self._share_pieces['root_hash'], + self._share_pieces['salt'], + self._segment_size, + self._data_length, + self._required_shares, + self._total_shares, + self.get_signable(), + self._get_offsets_tuple()) + + def _get_offsets_dict(self): + post_offset = HEADER_LENGTH + offsets = {} + + verification_key_length = len(self._share_pieces['verification_key']) + o1 = offsets['signature'] = post_offset + verification_key_length + + signature_length = len(self._share_pieces['signature']) + o2 = offsets['share_hash_chain'] = o1 + signature_length + + share_hash_chain_length = len(self._share_pieces['share_hash_chain']) + o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length + + block_hash_tree_length = len(self._share_pieces['block_hash_tree']) + o4 = offsets['share_data'] = o3 + block_hash_tree_length + + share_data_length = len(self._share_pieces['sharedata']) + o5 = offsets['enc_privkey'] = o4 + share_data_length + + encprivkey_length = len(self._share_pieces['encprivkey']) + offsets['EOF'] = o5 + encprivkey_length + return offsets + + + def _get_offsets_tuple(self): + offsets = self._get_offsets_dict() + return tuple([(key, value) for key, value in offsets.items()]) + + + def _pack_offsets(self): + offsets = self._get_offsets_dict() + return struct.pack(">LLLLQQ", + offsets['signature'], + offsets['share_hash_chain'], + offsets['block_hash_tree'], + offsets['share_data'], + offsets['enc_privkey'], + offsets['EOF']) + + + def finish_publishing(self): + """ + Do anything necessary to finish writing the share to a remote + server. I require that no further publishing needs to take place + after this method has been called. + """ + for k in ["sharedata", "encprivkey", "signature", "verification_key", + "share_hash_chain", "block_hash_tree"]: + assert k in self._share_pieces + # This is the only method that actually writes something to the + # remote server. + # First, we need to pack the share into data that we can write + # to the remote server in one write. + offsets = self._pack_offsets() + prefix = self.get_signable() + final_share = "".join([prefix, + offsets, + self._share_pieces['verification_key'], + self._share_pieces['signature'], + self._share_pieces['share_hash_chain'], + self._share_pieces['block_hash_tree'], + self._share_pieces['sharedata'], + self._share_pieces['encprivkey']]) + + # Our only data vector is going to be writing the final share, + # in its entirely. + datavs = [(0, final_share)] + + if not self._testvs: + # Our caller has not provided us with another checkstring + # yet, so we assume that we are writing a new share, and set + # a test vector that will allow a new share to be written. + self._testvs = [] + self._testvs.append(tuple([0, 1, "eq", ""])) + + tw_vectors = {} + tw_vectors[self.shnum] = (self._testvs, datavs, None) + return self._rref.callRemote("slot_testv_and_readv_and_writev", + self._storage_index, + self._secrets, + tw_vectors, + # TODO is it useful to read something? + self._readvs) + + +MDMFHEADER = ">BQ32sBBQQ QQQQQQQQ" +MDMFHEADERWITHOUTOFFSETS = ">BQ32sBBQQ" +MDMFHEADERSIZE = struct.calcsize(MDMFHEADER) +MDMFHEADERWITHOUTOFFSETSSIZE = struct.calcsize(MDMFHEADERWITHOUTOFFSETS) +MDMFCHECKSTRING = ">BQ32s" +MDMFSIGNABLEHEADER = ">BQ32sBBQQ" +MDMFOFFSETS = ">QQQQQQQQ" +MDMFOFFSETS_LENGTH = struct.calcsize(MDMFOFFSETS) + +PRIVATE_KEY_SIZE = 1220 +SIGNATURE_SIZE = 260 +VERIFICATION_KEY_SIZE = 292 +# We know we won't have more than 256 shares, and we know that we won't +# need to store more than lg 256 of them to validate, so that's our +# bound. We add 1 to the int cast to round to the next integer. +SHARE_HASH_CHAIN_SIZE = int(math.log(HASH_SIZE * 256)) + 1 + +class MDMFSlotWriteProxy: + implements(IMutableSlotWriter) + + """ + I represent a remote write slot for an MDMF mutable file. + + I abstract away from my caller the details of block and salt + management, and the implementation of the on-disk format for MDMF + shares. + """ + # Expected layout, MDMF: + # offset: size: name: + #-- signed part -- + # 0 1 version number (01) + # 1 8 sequence number + # 9 32 share tree root hash + # 41 1 The "k" encoding parameter + # 42 1 The "N" encoding parameter + # 43 8 The segment size of the uploaded file + # 51 8 The data length of the original plaintext + #-- end signed part -- + # 59 8 The offset of the encrypted private key + # 67 8 The offset of the signature + # 75 8 The offset of the verification key + # 83 8 The offset of the end of the v. key. + # 92 8 The offset of the share data + # 100 8 The offset of the block hash tree + # 108 8 The offset of the share hash chain + # 116 8 The offset of EOF + # + # followed by the encrypted private key, signature, verification + # key, share hash chain, data, and block hash tree. We order the + # fields that way to make smart downloaders -- downloaders which + # prempetively read a big part of the share -- possible. + # + # The checkstring is the first three fields -- the version number, + # sequence number, root hash and root salt hash. This is consistent + # in meaning to what we have with SDMF files, except now instead of + # using the literal salt, we use a value derived from all of the + # salts -- the share hash root. + # + # The salt is stored before the block for each segment. The block + # hash tree is computed over the combination of block and salt for + # each segment. In this way, we get integrity checking for both + # block and salt with the current block hash tree arrangement. + # + # The ordering of the offsets is different to reflect the dependencies + # that we'll run into with an MDMF file. The expected write flow is + # something like this: + # + # 0: Initialize with the sequence number, encoding parameters and + # data length. From this, we can deduce the number of segments, + # and where they should go.. We can also figure out where the + # encrypted private key should go, because we can figure out how + # big the share data will be. + # + # 1: Encrypt, encode, and upload the file in chunks. Do something + # like + # + # put_block(data, segnum, salt) + # + # to write a block and a salt to the disk. We can do both of + # these operations now because we have enough of the offsets to + # know where to put them. + # + # 2: Put the encrypted private key. Use: + # + # put_encprivkey(encprivkey) + # + # Now that we know the length of the private key, we can fill + # in the offset for the block hash tree. + # + # 3: We're now in a position to upload the block hash tree for + # a share. Put that using something like: + # + # put_blockhashes(block_hash_tree) + # + # Note that block_hash_tree is a list of hashes -- we'll take + # care of the details of serializing that appropriately. When + # we get the block hash tree, we are also in a position to + # calculate the offset for the share hash chain, and fill that + # into the offsets table. + # + # 4: We're now in a position to upload the share hash chain for + # a share. Do that with something like: + # + # put_sharehashes(share_hash_chain) + # + # share_hash_chain should be a dictionary mapping shnums to + # 32-byte hashes -- the wrapper handles serialization. + # We'll know where to put the signature at this point, also. + # The root of this tree will be put explicitly in the next + # step. + # + # 5: Before putting the signature, we must first put the + # root_hash. Do this with: + # + # put_root_hash(root_hash). + # + # In terms of knowing where to put this value, it was always + # possible to place it, but it makes sense semantically to + # place it after the share hash tree, so that's why you do it + # in this order. + # + # 6: With the root hash put, we can now sign the header. Use: + # + # get_signable() + # + # to get the part of the header that you want to sign, and use: + # + # put_signature(signature) + # + # to write your signature to the remote server. + # + # 6: Add the verification key, and finish. Do: + # + # put_verification_key(key) + # + # and + # + # finish_publish() + # + # Checkstring management: + # + # To write to a mutable slot, we have to provide test vectors to ensure + # that we are writing to the same data that we think we are. These + # vectors allow us to detect uncoordinated writes; that is, writes + # where both we and some other shareholder are writing to the + # mutable slot, and to report those back to the parts of the program + # doing the writing. + # + # With SDMF, this was easy -- all of the share data was written in + # one go, so it was easy to detect uncoordinated writes, and we only + # had to do it once. With MDMF, not all of the file is written at + # once. + # + # If a share is new, we write out as much of the header as we can + # before writing out anything else. This gives other writers a + # canary that they can use to detect uncoordinated writes, and, if + # they do the same thing, gives us the same canary. We them update + # the share. We won't be able to write out two fields of the header + # -- the share tree hash and the salt hash -- until we finish + # writing out the share. We only require the writer to provide the + # initial checkstring, and keep track of what it should be after + # updates ourselves. + # + # If we haven't written anything yet, then on the first write (which + # will probably be a block + salt of a share), we'll also write out + # the header. On subsequent passes, we'll expect to see the header. + # This changes in two places: + # + # - When we write out the salt hash + # - When we write out the root of the share hash tree + # + # since these values will change the header. It is possible that we + # can just make those be written in one operation to minimize + # disruption. + def __init__(self, + shnum, + rref, # a remote reference to a storage server + storage_index, + secrets, # (write_enabler, renew_secret, cancel_secret) + seqnum, # the sequence number of the mutable file + required_shares, + total_shares, + segment_size, + data_length): # the length of the original file + self.shnum = shnum + self._rref = rref + self._storage_index = storage_index + self._seqnum = seqnum + self._required_shares = required_shares + assert self.shnum >= 0 and self.shnum < total_shares + self._total_shares = total_shares + # We build up the offset table as we write things. It is the + # last thing we write to the remote server. + self._offsets = {} + self._testvs = [] + # This is a list of write vectors that will be sent to our + # remote server once we are directed to write things there. + self._writevs = [] + self._secrets = secrets + # The segment size needs to be a multiple of the k parameter -- + # any padding should have been carried out by the publisher + # already. + assert segment_size % required_shares == 0 + self._segment_size = segment_size + self._data_length = data_length + + # These are set later -- we define them here so that we can + # check for their existence easily + + # This is the root of the share hash tree -- the Merkle tree + # over the roots of the block hash trees computed for shares in + # this upload. + self._root_hash = None + + # We haven't yet written anything to the remote bucket. By + # setting this, we tell the _write method as much. The write + # method will then know that it also needs to add a write vector + # for the checkstring (or what we have of it) to the first write + # request. We'll then record that value for future use. If + # we're expecting something to be there already, we need to call + # set_checkstring before we write anything to tell the first + # write about that. + self._written = False + + # When writing data to the storage servers, we get a read vector + # for free. We'll read the checkstring, which will help us + # figure out what's gone wrong if a write fails. + self._readv = [(0, struct.calcsize(MDMFCHECKSTRING))] + + # We calculate the number of segments because it tells us + # where the salt part of the file ends/share segment begins, + # and also because it provides a useful amount of bounds checking. + self._num_segments = mathutil.div_ceil(self._data_length, + self._segment_size) + self._block_size = self._segment_size / self._required_shares + # We also calculate the share size, to help us with block + # constraints later. + tail_size = self._data_length % self._segment_size + if not tail_size: + self._tail_block_size = self._block_size + else: + self._tail_block_size = mathutil.next_multiple(tail_size, + self._required_shares) + self._tail_block_size /= self._required_shares + + # We already know where the sharedata starts; right after the end + # of the header (which is defined as the signable part + the offsets) + # We can also calculate where the encrypted private key begins + # from what we know know. + self._actual_block_size = self._block_size + SALT_SIZE + data_size = self._actual_block_size * (self._num_segments - 1) + data_size += self._tail_block_size + data_size += SALT_SIZE + self._offsets['enc_privkey'] = MDMFHEADERSIZE + + # We don't define offsets for these because we want them to be + # tightly packed -- this allows us to ignore the responsibility + # of padding individual values, and of removing that padding + # later. So nonconstant_start is where we start writing + # nonconstant data. + nonconstant_start = self._offsets['enc_privkey'] + nonconstant_start += PRIVATE_KEY_SIZE + nonconstant_start += SIGNATURE_SIZE + nonconstant_start += VERIFICATION_KEY_SIZE + nonconstant_start += SHARE_HASH_CHAIN_SIZE + + self._offsets['share_data'] = nonconstant_start + + # Finally, we know how big the share data will be, so we can + # figure out where the block hash tree needs to go. + # XXX: But this will go away if Zooko wants to make it so that + # you don't need to know the size of the file before you start + # uploading it. + self._offsets['block_hash_tree'] = self._offsets['share_data'] + \ + data_size + + # Done. We can snow start writing. + + + def set_checkstring(self, + seqnum_or_checkstring, + root_hash=None, + salt=None): + """ + Set checkstring checkstring for the given shnum. + + This can be invoked in one of two ways. + + With one argument, I assume that you are giving me a literal + checkstring -- e.g., the output of get_checkstring. I will then + set that checkstring as it is. This form is used by unit tests. + + With two arguments, I assume that you are giving me a sequence + number and root hash to make a checkstring from. In that case, I + will build a checkstring and set it for you. This form is used + by the publisher. + + By default, I assume that I am writing new shares to the grid. + If you don't explcitly set your own checkstring, I will use + one that requires that the remote share not exist. You will want + to use this method if you are updating a share in-place; + otherwise, writes will fail. + """ + # You're allowed to overwrite checkstrings with this method; + # I assume that users know what they are doing when they call + # it. + if root_hash: + checkstring = struct.pack(MDMFCHECKSTRING, + 1, + seqnum_or_checkstring, + root_hash) + else: + checkstring = seqnum_or_checkstring + + if checkstring == "": + # We special-case this, since len("") = 0, but we need + # length of 1 for the case of an empty share to work on the + # storage server, which is what a checkstring that is the + # empty string means. + self._testvs = [] + else: + self._testvs = [] + self._testvs.append((0, len(checkstring), "eq", checkstring)) + + + def __repr__(self): + return "MDMFSlotWriteProxy for share %d" % self.shnum + + + def get_checkstring(self): + """ + Given a share number, I return a representation of what the + checkstring for that share on the server will look like. + + I am mostly used for tests. + """ + if self._root_hash: + roothash = self._root_hash + else: + roothash = "\x00" * 32 + return struct.pack(MDMFCHECKSTRING, + 1, + self._seqnum, + roothash) + + + def put_block(self, data, segnum, salt): + """ + I queue a write vector for the data, salt, and segment number + provided to me. I return None, as I do not actually cause + anything to be written yet. + """ + if segnum >= self._num_segments: + raise LayoutInvalid("I won't overwrite the block hash tree") + if len(salt) != SALT_SIZE: + raise LayoutInvalid("I was given a salt of size %d, but " + "I wanted a salt of size %d") + if segnum + 1 == self._num_segments: + if len(data) != self._tail_block_size: + raise LayoutInvalid("I was given the wrong size block to write") + elif len(data) != self._block_size: + raise LayoutInvalid("I was given the wrong size block to write") + + # We want to write at len(MDMFHEADER) + segnum * block_size. + offset = self._offsets['share_data'] + \ + (self._actual_block_size * segnum) + data = salt + data + + self._writevs.append(tuple([offset, data])) + + + def put_encprivkey(self, encprivkey): + """ + I queue a write vector for the encrypted private key provided to + me. + """ + assert self._offsets + assert self._offsets['enc_privkey'] + # You shouldn't re-write the encprivkey after the block hash + # tree is written, since that could cause the private key to run + # into the block hash tree. Before it writes the block hash + # tree, the block hash tree writing method writes the offset of + # the share hash chain. So that's a good indicator of whether or + # not the block hash tree has been written. + if "signature" in self._offsets: + raise LayoutInvalid("You can't put the encrypted private key " + "after putting the share hash chain") + + self._offsets['share_hash_chain'] = self._offsets['enc_privkey'] + \ + len(encprivkey) + + self._writevs.append(tuple([self._offsets['enc_privkey'], encprivkey])) + + + def put_blockhashes(self, blockhashes): + """ + I queue a write vector to put the block hash tree in blockhashes + onto the remote server. + + The encrypted private key must be queued before the block hash + tree, since we need to know how large it is to know where the + block hash tree should go. The block hash tree must be put + before the share hash chain, since its size determines the + offset of the share hash chain. + """ + assert self._offsets + assert "block_hash_tree" in self._offsets + + assert isinstance(blockhashes, list) + + blockhashes_s = "".join(blockhashes) + self._offsets['EOF'] = self._offsets['block_hash_tree'] + len(blockhashes_s) + + self._writevs.append(tuple([self._offsets['block_hash_tree'], + blockhashes_s])) + + + def put_sharehashes(self, sharehashes): + """ + I queue a write vector to put the share hash chain in my + argument onto the remote server. + + The block hash tree must be queued before the share hash chain, + since we need to know where the block hash tree ends before we + can know where the share hash chain starts. The share hash chain + must be put before the signature, since the length of the packed + share hash chain determines the offset of the signature. Also, + semantically, you must know what the root of the block hash tree + is before you can generate a valid signature. + """ + assert isinstance(sharehashes, dict) + assert self._offsets + if "share_hash_chain" not in self._offsets: + raise LayoutInvalid("You must put the block hash tree before " + "putting the share hash chain") + + # The signature comes after the share hash chain. If the + # signature has already been written, we must not write another + # share hash chain. The signature writes the verification key + # offset when it gets sent to the remote server, so we look for + # that. + if "verification_key" in self._offsets: + raise LayoutInvalid("You must write the share hash chain " + "before you write the signature") + sharehashes_s = "".join([struct.pack(">H32s", i, sharehashes[i]) + for i in sorted(sharehashes.keys())]) + self._offsets['signature'] = self._offsets['share_hash_chain'] + \ + len(sharehashes_s) + self._writevs.append(tuple([self._offsets['share_hash_chain'], + sharehashes_s])) + + + def put_root_hash(self, roothash): + """ + Put the root hash (the root of the share hash tree) in the + remote slot. + """ + # It does not make sense to be able to put the root + # hash without first putting the share hashes, since you need + # the share hashes to generate the root hash. + # + # Signature is defined by the routine that places the share hash + # chain, so it's a good thing to look for in finding out whether + # or not the share hash chain exists on the remote server. + if len(roothash) != HASH_SIZE: + raise LayoutInvalid("hashes and salts must be exactly %d bytes" + % HASH_SIZE) + self._root_hash = roothash + # To write both of these values, we update the checkstring on + # the remote server, which includes them + checkstring = self.get_checkstring() + self._writevs.append(tuple([0, checkstring])) + # This write, if successful, changes the checkstring, so we need + # to update our internal checkstring to be consistent with the + # one on the server. + + + def get_signable(self): + """ + Get the first seven fields of the mutable file; the parts that + are signed. + """ + if not self._root_hash: + raise LayoutInvalid("You need to set the root hash " + "before getting something to " + "sign") + return struct.pack(MDMFSIGNABLEHEADER, + 1, + self._seqnum, + self._root_hash, + self._required_shares, + self._total_shares, + self._segment_size, + self._data_length) + + + def put_signature(self, signature): + """ + I queue a write vector for the signature of the MDMF share. + + I require that the root hash and share hash chain have been put + to the grid before I will write the signature to the grid. + """ + if "signature" not in self._offsets: + raise LayoutInvalid("You must put the share hash chain " + # It does not make sense to put a signature without first + # putting the root hash and the salt hash (since otherwise + # the signature would be incomplete), so we don't allow that. + "before putting the signature") + if not self._root_hash: + raise LayoutInvalid("You must complete the signed prefix " + "before computing a signature") + # If we put the signature after we put the verification key, we + # could end up running into the verification key, and will + # probably screw up the offsets as well. So we don't allow that. + if "verification_key_end" in self._offsets: + raise LayoutInvalid("You can't put the signature after the " + "verification key") + # The method that writes the verification key defines the EOF + # offset before writing the verification key, so look for that. + self._offsets['verification_key'] = self._offsets['signature'] +\ + len(signature) + self._writevs.append(tuple([self._offsets['signature'], signature])) + + + def put_verification_key(self, verification_key): + """ + I queue a write vector for the verification key. + + I require that the signature have been written to the storage + server before I allow the verification key to be written to the + remote server. + """ + if "verification_key" not in self._offsets: + raise LayoutInvalid("You must put the signature before you " + "can put the verification key") + + self._offsets['verification_key_end'] = \ + self._offsets['verification_key'] + len(verification_key) + assert self._offsets['verification_key_end'] <= self._offsets['share_data'] + self._writevs.append(tuple([self._offsets['verification_key'], + verification_key])) + + + def _get_offsets_tuple(self): + return tuple([(key, value) for key, value in self._offsets.items()]) + + + def get_verinfo(self): + return (self._seqnum, + self._root_hash, + self._required_shares, + self._total_shares, + self._segment_size, + self._data_length, + self.get_signable(), + self._get_offsets_tuple()) + + + def finish_publishing(self): + """ + I add a write vector for the offsets table, and then cause all + of the write vectors that I've dealt with so far to be published + to the remote server, ending the write process. + """ + if "verification_key_end" not in self._offsets: + raise LayoutInvalid("You must put the verification key before " + "you can publish the offsets") + offsets_offset = struct.calcsize(MDMFHEADERWITHOUTOFFSETS) + offsets = struct.pack(MDMFOFFSETS, + self._offsets['enc_privkey'], + self._offsets['share_hash_chain'], + self._offsets['signature'], + self._offsets['verification_key'], + self._offsets['verification_key_end'], + self._offsets['share_data'], + self._offsets['block_hash_tree'], + self._offsets['EOF']) + self._writevs.append(tuple([offsets_offset, offsets])) + encoding_parameters_offset = struct.calcsize(MDMFCHECKSTRING) + params = struct.pack(">BBQQ", + self._required_shares, + self._total_shares, + self._segment_size, + self._data_length) + self._writevs.append(tuple([encoding_parameters_offset, params])) + return self._write(self._writevs) + + + def _write(self, datavs, on_failure=None, on_success=None): + """I write the data vectors in datavs to the remote slot.""" + tw_vectors = {} + if not self._testvs: + self._testvs = [] + self._testvs.append(tuple([0, 1, "eq", ""])) + if not self._written: + # Write a new checkstring to the share when we write it, so + # that we have something to check later. + new_checkstring = self.get_checkstring() + datavs.append((0, new_checkstring)) + def _first_write(): + self._written = True + self._testvs = [(0, len(new_checkstring), "eq", new_checkstring)] + on_success = _first_write + tw_vectors[self.shnum] = (self._testvs, datavs, None) + d = self._rref.callRemote("slot_testv_and_readv_and_writev", + self._storage_index, + self._secrets, + tw_vectors, + self._readv) + def _result(results): + if isinstance(results, failure.Failure) or not results[0]: + # Do nothing; the write was unsuccessful. + if on_failure: on_failure() + else: + if on_success: on_success() + return results + d.addCallback(_result) + return d + + +class MDMFSlotReadProxy: + """ + I read from a mutable slot filled with data written in the MDMF data + format (which is described above). + + I can be initialized with some amount of data, which I will use (if + it is valid) to eliminate some of the need to fetch it from servers. + """ + def __init__(self, + rref, + storage_index, + shnum, + data=""): + # Start the initialization process. + self._rref = rref + self._storage_index = storage_index + self.shnum = shnum + + # Before doing anything, the reader is probably going to want to + # verify that the signature is correct. To do that, they'll need + # the verification key, and the signature. To get those, we'll + # need the offset table. So fetch the offset table on the + # assumption that that will be the first thing that a reader is + # going to do. + + # The fact that these encoding parameters are None tells us + # that we haven't yet fetched them from the remote share, so we + # should. We could just not set them, but the checks will be + # easier to read if we don't have to use hasattr. + self._version_number = None + self._sequence_number = None + self._root_hash = None + # Filled in if we're dealing with an SDMF file. Unused + # otherwise. + self._salt = None + self._required_shares = None + self._total_shares = None + self._segment_size = None + self._data_length = None + self._offsets = None + + # If the user has chosen to initialize us with some data, we'll + # try to satisfy subsequent data requests with that data before + # asking the storage server for it. If + self._data = data + # The way callers interact with cache in the filenode returns + # None if there isn't any cached data, but the way we index the + # cached data requires a string, so convert None to "". + if self._data == None: + self._data = "" + + self._queue_observers = observer.ObserverList() + self._queue_errbacks = observer.ObserverList() + self._readvs = [] + + + def _maybe_fetch_offsets_and_header(self, force_remote=False): + """ + I fetch the offset table and the header from the remote slot if + I don't already have them. If I do have them, I do nothing and + return an empty Deferred. + """ + if self._offsets: + return defer.succeed(None) + # At this point, we may be either SDMF or MDMF. Fetching 107 + # bytes will be enough to get header and offsets for both SDMF and + # MDMF, though we'll be left with 4 more bytes than we + # need if this ends up being MDMF. This is probably less + # expensive than the cost of a second roundtrip. + readvs = [(0, 123)] + d = self._read(readvs, force_remote) + d.addCallback(self._process_encoding_parameters) + d.addCallback(self._process_offsets) + return d + + + def _process_encoding_parameters(self, encoding_parameters): + assert self.shnum in encoding_parameters + encoding_parameters = encoding_parameters[self.shnum][0] + # The first byte is the version number. It will tell us what + # to do next. + (verno,) = struct.unpack(">B", encoding_parameters[:1]) + if verno == MDMF_VERSION: + read_size = MDMFHEADERWITHOUTOFFSETSSIZE + (verno, + seqnum, + root_hash, + k, + n, + segsize, + datalen) = struct.unpack(MDMFHEADERWITHOUTOFFSETS, + encoding_parameters[:read_size]) + if segsize == 0 and datalen == 0: + # Empty file, no segments. + self._num_segments = 0 + else: + self._num_segments = mathutil.div_ceil(datalen, segsize) + + elif verno == SDMF_VERSION: + read_size = SIGNED_PREFIX_LENGTH + (verno, + seqnum, + root_hash, + salt, + k, + n, + segsize, + datalen) = struct.unpack(">BQ32s16s BBQQ", + encoding_parameters[:SIGNED_PREFIX_LENGTH]) + self._salt = salt + if segsize == 0 and datalen == 0: + # empty file + self._num_segments = 0 + else: + # non-empty SDMF files have one segment. + self._num_segments = 1 + else: + raise UnknownVersionError("You asked me to read mutable file " + "version %d, but I only understand " + "%d and %d" % (verno, SDMF_VERSION, + MDMF_VERSION)) + + self._version_number = verno + self._sequence_number = seqnum + self._root_hash = root_hash + self._required_shares = k + self._total_shares = n + self._segment_size = segsize + self._data_length = datalen + + self._block_size = self._segment_size / self._required_shares + # We can upload empty files, and need to account for this fact + # so as to avoid zero-division and zero-modulo errors. + if datalen > 0: + tail_size = self._data_length % self._segment_size + else: + tail_size = 0 + if not tail_size: + self._tail_block_size = self._block_size + else: + self._tail_block_size = mathutil.next_multiple(tail_size, + self._required_shares) + self._tail_block_size /= self._required_shares + + return encoding_parameters + + + def _process_offsets(self, offsets): + if self._version_number == 0: + read_size = OFFSETS_LENGTH + read_offset = SIGNED_PREFIX_LENGTH + end = read_size + read_offset + (signature, + share_hash_chain, + block_hash_tree, + share_data, + enc_privkey, + EOF) = struct.unpack(">LLLLQQ", + offsets[read_offset:end]) + self._offsets = {} + self._offsets['signature'] = signature + self._offsets['share_data'] = share_data + self._offsets['block_hash_tree'] = block_hash_tree + self._offsets['share_hash_chain'] = share_hash_chain + self._offsets['enc_privkey'] = enc_privkey + self._offsets['EOF'] = EOF + + elif self._version_number == 1: + read_offset = MDMFHEADERWITHOUTOFFSETSSIZE + read_length = MDMFOFFSETS_LENGTH + end = read_offset + read_length + (encprivkey, + sharehashes, + signature, + verification_key, + verification_key_end, + sharedata, + blockhashes, + eof) = struct.unpack(MDMFOFFSETS, + offsets[read_offset:end]) + self._offsets = {} + self._offsets['enc_privkey'] = encprivkey + self._offsets['block_hash_tree'] = blockhashes + self._offsets['share_hash_chain'] = sharehashes + self._offsets['signature'] = signature + self._offsets['verification_key'] = verification_key + self._offsets['verification_key_end']= \ + verification_key_end + self._offsets['EOF'] = eof + self._offsets['share_data'] = sharedata + + + def get_block_and_salt(self, segnum, queue=False): + """ + I return (block, salt), where block is the block data and + salt is the salt used to encrypt that segment. + """ + d = self._maybe_fetch_offsets_and_header() + def _then(ignored): + base_share_offset = self._offsets['share_data'] + + if segnum + 1 > self._num_segments: + raise LayoutInvalid("Not a valid segment number") + + if self._version_number == 0: + share_offset = base_share_offset + self._block_size * segnum + else: + share_offset = base_share_offset + (self._block_size + \ + SALT_SIZE) * segnum + if segnum + 1 == self._num_segments: + data = self._tail_block_size + else: + data = self._block_size + + if self._version_number == 1: + data += SALT_SIZE + + readvs = [(share_offset, data)] + return readvs + d.addCallback(_then) + d.addCallback(lambda readvs: + self._read(readvs, queue=queue)) + def _process_results(results): + assert self.shnum in results + if self._version_number == 0: + # We only read the share data, but we know the salt from + # when we fetched the header + data = results[self.shnum] + if not data: + data = "" + else: + assert len(data) == 1 + data = data[0] + salt = self._salt + else: + data = results[self.shnum] + if not data: + salt = data = "" + else: + salt_and_data = results[self.shnum][0] + salt = salt_and_data[:SALT_SIZE] + data = salt_and_data[SALT_SIZE:] + return data, salt + d.addCallback(_process_results) + return d + + + def get_blockhashes(self, needed=None, queue=False, force_remote=False): + """ + I return the block hash tree + + I take an optional argument, needed, which is a set of indices + correspond to hashes that I should fetch. If this argument is + missing, I will fetch the entire block hash tree; otherwise, I + may attempt to fetch fewer hashes, based on what needed says + that I should do. Note that I may fetch as many hashes as I + want, so long as the set of hashes that I do fetch is a superset + of the ones that I am asked for, so callers should be prepared + to tolerate additional hashes. + """ + # TODO: Return only the parts of the block hash tree necessary + # to validate the blocknum provided? + # This is a good idea, but it is hard to implement correctly. It + # is bad to fetch any one block hash more than once, so we + # probably just want to fetch the whole thing at once and then + # serve it. + if needed == set([]): + return defer.succeed([]) + d = self._maybe_fetch_offsets_and_header() + def _then(ignored): + blockhashes_offset = self._offsets['block_hash_tree'] + if self._version_number == 1: + blockhashes_length = self._offsets['EOF'] - blockhashes_offset + else: + blockhashes_length = self._offsets['share_data'] - blockhashes_offset + readvs = [(blockhashes_offset, blockhashes_length)] + return readvs + d.addCallback(_then) + d.addCallback(lambda readvs: + self._read(readvs, queue=queue, force_remote=force_remote)) + def _build_block_hash_tree(results): + assert self.shnum in results + + rawhashes = results[self.shnum][0] + results = [rawhashes[i:i+HASH_SIZE] + for i in range(0, len(rawhashes), HASH_SIZE)] + return results + d.addCallback(_build_block_hash_tree) + return d + + + def get_sharehashes(self, needed=None, queue=False, force_remote=False): + """ + I return the part of the share hash chain placed to validate + this share. + + I take an optional argument, needed. Needed is a set of indices + that correspond to the hashes that I should fetch. If needed is + not present, I will fetch and return the entire share hash + chain. Otherwise, I may fetch and return any part of the share + hash chain that is a superset of the part that I am asked to + fetch. Callers should be prepared to deal with more hashes than + they've asked for. + """ + if needed == set([]): + return defer.succeed([]) + d = self._maybe_fetch_offsets_and_header() + + def _make_readvs(ignored): + sharehashes_offset = self._offsets['share_hash_chain'] + if self._version_number == 0: + sharehashes_length = self._offsets['block_hash_tree'] - sharehashes_offset + else: + sharehashes_length = self._offsets['signature'] - sharehashes_offset + readvs = [(sharehashes_offset, sharehashes_length)] + return readvs + d.addCallback(_make_readvs) + d.addCallback(lambda readvs: + self._read(readvs, queue=queue, force_remote=force_remote)) + def _build_share_hash_chain(results): + assert self.shnum in results + + sharehashes = results[self.shnum][0] + results = [sharehashes[i:i+(HASH_SIZE + 2)] + for i in range(0, len(sharehashes), HASH_SIZE + 2)] + results = dict([struct.unpack(">H32s", data) + for data in results]) + return results + d.addCallback(_build_share_hash_chain) + return d + + + def get_encprivkey(self, queue=False): + """ + I return the encrypted private key. + """ + d = self._maybe_fetch_offsets_and_header() + + def _make_readvs(ignored): + privkey_offset = self._offsets['enc_privkey'] + if self._version_number == 0: + privkey_length = self._offsets['EOF'] - privkey_offset + else: + privkey_length = self._offsets['share_hash_chain'] - privkey_offset + readvs = [(privkey_offset, privkey_length)] + return readvs + d.addCallback(_make_readvs) + d.addCallback(lambda readvs: + self._read(readvs, queue=queue)) + def _process_results(results): + assert self.shnum in results + privkey = results[self.shnum][0] + return privkey + d.addCallback(_process_results) + return d + + + def get_signature(self, queue=False): + """ + I return the signature of my share. + """ + d = self._maybe_fetch_offsets_and_header() + + def _make_readvs(ignored): + signature_offset = self._offsets['signature'] + if self._version_number == 1: + signature_length = self._offsets['verification_key'] - signature_offset + else: + signature_length = self._offsets['share_hash_chain'] - signature_offset + readvs = [(signature_offset, signature_length)] + return readvs + d.addCallback(_make_readvs) + d.addCallback(lambda readvs: + self._read(readvs, queue=queue)) + def _process_results(results): + assert self.shnum in results + signature = results[self.shnum][0] + return signature + d.addCallback(_process_results) + return d + + + def get_verification_key(self, queue=False): + """ + I return the verification key. + """ + d = self._maybe_fetch_offsets_and_header() + + def _make_readvs(ignored): + if self._version_number == 1: + vk_offset = self._offsets['verification_key'] + vk_length = self._offsets['verification_key_end'] - vk_offset + else: + vk_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ") + vk_length = self._offsets['signature'] - vk_offset + readvs = [(vk_offset, vk_length)] + return readvs + d.addCallback(_make_readvs) + d.addCallback(lambda readvs: + self._read(readvs, queue=queue)) + def _process_results(results): + assert self.shnum in results + verification_key = results[self.shnum][0] + return verification_key + d.addCallback(_process_results) + return d + + + def get_encoding_parameters(self): + """ + I return (k, n, segsize, datalen) + """ + d = self._maybe_fetch_offsets_and_header() + d.addCallback(lambda ignored: + (self._required_shares, + self._total_shares, + self._segment_size, + self._data_length)) + return d + + + def get_seqnum(self): + """ + I return the sequence number for this share. + """ + d = self._maybe_fetch_offsets_and_header() + d.addCallback(lambda ignored: + self._sequence_number) + return d + + + def get_root_hash(self): + """ + I return the root of the block hash tree + """ + d = self._maybe_fetch_offsets_and_header() + d.addCallback(lambda ignored: self._root_hash) + return d + + + def get_checkstring(self): + """ + I return the packed representation of the following: + + - version number + - sequence number + - root hash + - salt hash + + which my users use as a checkstring to detect other writers. + """ + d = self._maybe_fetch_offsets_and_header() + def _build_checkstring(ignored): + if self._salt: + checkstring = struct.pack(PREFIX, + self._version_number, + self._sequence_number, + self._root_hash, + self._salt) + else: + checkstring = struct.pack(MDMFCHECKSTRING, + self._version_number, + self._sequence_number, + self._root_hash) + + return checkstring + d.addCallback(_build_checkstring) + return d + + + def get_prefix(self, force_remote): + d = self._maybe_fetch_offsets_and_header(force_remote) + d.addCallback(lambda ignored: + self._build_prefix()) + return d + + + def _build_prefix(self): + # The prefix is another name for the part of the remote share + # that gets signed. It consists of everything up to and + # including the datalength, packed by struct. + if self._version_number == SDMF_VERSION: + return struct.pack(SIGNED_PREFIX, + self._version_number, + self._sequence_number, + self._root_hash, + self._salt, + self._required_shares, + self._total_shares, + self._segment_size, + self._data_length) + + else: + return struct.pack(MDMFSIGNABLEHEADER, + self._version_number, + self._sequence_number, + self._root_hash, + self._required_shares, + self._total_shares, + self._segment_size, + self._data_length) + + + def _get_offsets_tuple(self): + # The offsets tuple is another component of the version + # information tuple. It is basically our offsets dictionary, + # itemized and in a tuple. + return self._offsets.copy() + + + def get_verinfo(self): + """ + I return my verinfo tuple. This is used by the ServermapUpdater + to keep track of versions of mutable files. + + The verinfo tuple for MDMF files contains: + - seqnum + - root hash + - a blank (nothing) + - segsize + - datalen + - k + - n + - prefix (the thing that you sign) + - a tuple of offsets + + We include the nonce in MDMF to simplify processing of version + information tuples. + + The verinfo tuple for SDMF files is the same, but contains a + 16-byte IV instead of a hash of salts. + """ + d = self._maybe_fetch_offsets_and_header() + def _build_verinfo(ignored): + if self._version_number == SDMF_VERSION: + salt_to_use = self._salt + else: + salt_to_use = None + return (self._sequence_number, + self._root_hash, + salt_to_use, + self._segment_size, + self._data_length, + self._required_shares, + self._total_shares, + self._build_prefix(), + self._get_offsets_tuple()) + d.addCallback(_build_verinfo) + return d + + + def flush(self): + """ + I flush my queue of read vectors. + """ + d = self._read(self._readvs) + def _then(results): + self._readvs = [] + if isinstance(results, failure.Failure): + self._queue_errbacks.notify(results) + else: + self._queue_observers.notify(results) + self._queue_observers = observer.ObserverList() + self._queue_errbacks = observer.ObserverList() + d.addBoth(_then) + + + def _read(self, readvs, force_remote=False, queue=False): + unsatisfiable = filter(lambda x: x[0] + x[1] > len(self._data), readvs) + # TODO: It's entirely possible to tweak this so that it just + # fulfills the requests that it can, and not demand that all + # requests are satisfiable before running it. + if not unsatisfiable and not force_remote: + results = [self._data[offset:offset+length] + for (offset, length) in readvs] + results = {self.shnum: results} + return defer.succeed(results) + else: + if queue: + start = len(self._readvs) + self._readvs += readvs + end = len(self._readvs) + def _get_results(results, start, end): + if not self.shnum in results: + return {self._shnum: [""]} + return {self.shnum: results[self.shnum][start:end]} + d = defer.Deferred() + d.addCallback(_get_results, start, end) + self._queue_observers.subscribe(d.callback) + self._queue_errbacks.subscribe(d.errback) + return d + return self._rref.callRemote("slot_readv", + self._storage_index, + [self.shnum], + readvs) + + + def is_sdmf(self): + """I tell my caller whether or not my remote file is SDMF or MDMF + """ + d = self._maybe_fetch_offsets_and_header() + d.addCallback(lambda ignored: + self._version_number == 0) + return d + + +class LayoutInvalid(Exception): + """ + This isn't a valid MDMF mutable file + """ diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 81a889ed8..549b839fa 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -1,4 +1,4 @@ -import time, os.path, platform, stat, re, simplejson, struct +import time, os.path, platform, stat, re, simplejson, struct, shutil import mock @@ -20,8 +20,16 @@ from allmydata.storage.crawler import BucketCountingCrawler from allmydata.storage.expirer import LeaseCheckingCrawler from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \ ReadBucketProxy +from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \ + LayoutInvalid, MDMFSIGNABLEHEADER, \ + SIGNED_PREFIX, MDMFHEADER, \ + MDMFOFFSETS, SDMFSlotWriteProxy, \ + PRIVATE_KEY_SIZE, \ + SIGNATURE_SIZE, \ + VERIFICATION_KEY_SIZE, \ + SHARE_HASH_CHAIN_SIZE from allmydata.interfaces import BadWriteEnablerError -from allmydata.test.common import LoggingServiceParent +from allmydata.test.common import LoggingServiceParent, ShouldFailMixin from allmydata.test.common_web import WebRenderingMixin from allmydata.test.no_network import NoNetworkServer from allmydata.web.storage import StorageStatus, remove_prefix @@ -100,12 +108,23 @@ class Bucket(unittest.TestCase): class RemoteBucket: + def __init__(self): + self.read_count = 0 + self.write_count = 0 + def callRemote(self, methname, *args, **kwargs): def _call(): meth = getattr(self.target, "remote_" + methname) return meth(*args, **kwargs) + + if methname == "slot_readv": + self.read_count += 1 + if "writev" in methname: + self.write_count += 1 + return defer.maybeDeferred(_call) + class BucketProxy(unittest.TestCase): def make_bucket(self, name, size): basedir = os.path.join("storage", "BucketProxy", name) @@ -1288,6 +1307,1462 @@ class MutableServer(unittest.TestCase): self.failUnless(os.path.exists(prefixdir), prefixdir) self.failIf(os.path.exists(bucketdir), bucketdir) + +class MDMFProxies(unittest.TestCase, ShouldFailMixin): + def setUp(self): + self.sparent = LoggingServiceParent() + self._lease_secret = itertools.count() + self.ss = self.create("MDMFProxies storage test server") + self.rref = RemoteBucket() + self.rref.target = self.ss + self.secrets = (self.write_enabler("we_secret"), + self.renew_secret("renew_secret"), + self.cancel_secret("cancel_secret")) + self.segment = "aaaaaa" + self.block = "aa" + self.salt = "a" * 16 + self.block_hash = "a" * 32 + self.block_hash_tree = [self.block_hash for i in xrange(6)] + self.share_hash = self.block_hash + self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)]) + self.signature = "foobarbaz" + self.verification_key = "vvvvvv" + self.encprivkey = "private" + self.root_hash = self.block_hash + self.salt_hash = self.root_hash + self.salt_hash_tree = [self.salt_hash for i in xrange(6)] + self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree) + self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain) + # blockhashes and salt hashes are serialized in the same way, + # only we lop off the first element and store that in the + # header. + self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:]) + + + def tearDown(self): + self.sparent.stopService() + shutil.rmtree(self.workdir("MDMFProxies storage test server")) + + + def write_enabler(self, we_tag): + return hashutil.tagged_hash("we_blah", we_tag) + + + def renew_secret(self, tag): + return hashutil.tagged_hash("renew_blah", str(tag)) + + + def cancel_secret(self, tag): + return hashutil.tagged_hash("cancel_blah", str(tag)) + + + def workdir(self, name): + basedir = os.path.join("storage", "MutableServer", name) + return basedir + + + def create(self, name): + workdir = self.workdir(name) + ss = StorageServer(workdir, "\x00" * 20) + ss.setServiceParent(self.sparent) + return ss + + + def build_test_mdmf_share(self, tail_segment=False, empty=False): + # Start with the checkstring + data = struct.pack(">BQ32s", + 1, + 0, + self.root_hash) + self.checkstring = data + # Next, the encoding parameters + if tail_segment: + data += struct.pack(">BBQQ", + 3, + 10, + 6, + 33) + elif empty: + data += struct.pack(">BBQQ", + 3, + 10, + 0, + 0) + else: + data += struct.pack(">BBQQ", + 3, + 10, + 6, + 36) + # Now we'll build the offsets. + sharedata = "" + if not tail_segment and not empty: + for i in xrange(6): + sharedata += self.salt + self.block + elif tail_segment: + for i in xrange(5): + sharedata += self.salt + self.block + sharedata += self.salt + "a" + + # The encrypted private key comes after the shares + salts + offset_size = struct.calcsize(MDMFOFFSETS) + encrypted_private_key_offset = len(data) + offset_size + # The share has chain comes after the private key + sharehashes_offset = encrypted_private_key_offset + \ + len(self.encprivkey) + + # The signature comes after the share hash chain. + signature_offset = sharehashes_offset + len(self.share_hash_chain_s) + + verification_key_offset = signature_offset + len(self.signature) + verification_key_end = verification_key_offset + \ + len(self.verification_key) + + share_data_offset = offset_size + share_data_offset += PRIVATE_KEY_SIZE + share_data_offset += SIGNATURE_SIZE + share_data_offset += VERIFICATION_KEY_SIZE + share_data_offset += SHARE_HASH_CHAIN_SIZE + + blockhashes_offset = share_data_offset + len(sharedata) + eof_offset = blockhashes_offset + len(self.block_hash_tree_s) + + data += struct.pack(MDMFOFFSETS, + encrypted_private_key_offset, + sharehashes_offset, + signature_offset, + verification_key_offset, + verification_key_end, + share_data_offset, + blockhashes_offset, + eof_offset) + + self.offsets = {} + self.offsets['enc_privkey'] = encrypted_private_key_offset + self.offsets['block_hash_tree'] = blockhashes_offset + self.offsets['share_hash_chain'] = sharehashes_offset + self.offsets['signature'] = signature_offset + self.offsets['verification_key'] = verification_key_offset + self.offsets['share_data'] = share_data_offset + self.offsets['verification_key_end'] = verification_key_end + self.offsets['EOF'] = eof_offset + + # the private key, + data += self.encprivkey + # the sharehashes + data += self.share_hash_chain_s + # the signature, + data += self.signature + # and the verification key + data += self.verification_key + # Then we'll add in gibberish until we get to the right point. + nulls = "".join([" " for i in xrange(len(data), share_data_offset)]) + data += nulls + + # Then the share data + data += sharedata + # the blockhashes + data += self.block_hash_tree_s + return data + + + def write_test_share_to_server(self, + storage_index, + tail_segment=False, + empty=False): + """ + I write some data for the read tests to read to self.ss + + If tail_segment=True, then I will write a share that has a + smaller tail segment than other segments. + """ + write = self.ss.remote_slot_testv_and_readv_and_writev + data = self.build_test_mdmf_share(tail_segment, empty) + # Finally, we write the whole thing to the storage server in one + # pass. + testvs = [(0, 1, "eq", "")] + tws = {} + tws[0] = (testvs, [(0, data)], None) + readv = [(0, 1)] + results = write(storage_index, self.secrets, tws, readv) + self.failUnless(results[0]) + + + def build_test_sdmf_share(self, empty=False): + if empty: + sharedata = "" + else: + sharedata = self.segment * 6 + self.sharedata = sharedata + blocksize = len(sharedata) / 3 + block = sharedata[:blocksize] + self.blockdata = block + prefix = struct.pack(">BQ32s16s BBQQ", + 0, # version, + 0, + self.root_hash, + self.salt, + 3, + 10, + len(sharedata), + len(sharedata), + ) + post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ") + signature_offset = post_offset + len(self.verification_key) + sharehashes_offset = signature_offset + len(self.signature) + blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s) + sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s) + encprivkey_offset = sharedata_offset + len(block) + eof_offset = encprivkey_offset + len(self.encprivkey) + offsets = struct.pack(">LLLLQQ", + signature_offset, + sharehashes_offset, + blockhashes_offset, + sharedata_offset, + encprivkey_offset, + eof_offset) + final_share = "".join([prefix, + offsets, + self.verification_key, + self.signature, + self.share_hash_chain_s, + self.block_hash_tree_s, + block, + self.encprivkey]) + self.offsets = {} + self.offsets['signature'] = signature_offset + self.offsets['share_hash_chain'] = sharehashes_offset + self.offsets['block_hash_tree'] = blockhashes_offset + self.offsets['share_data'] = sharedata_offset + self.offsets['enc_privkey'] = encprivkey_offset + self.offsets['EOF'] = eof_offset + return final_share + + + def write_sdmf_share_to_server(self, + storage_index, + empty=False): + # Some tests need SDMF shares to verify that we can still + # read them. This method writes one, which resembles but is not + assert self.rref + write = self.ss.remote_slot_testv_and_readv_and_writev + share = self.build_test_sdmf_share(empty) + testvs = [(0, 1, "eq", "")] + tws = {} + tws[0] = (testvs, [(0, share)], None) + readv = [] + results = write(storage_index, self.secrets, tws, readv) + self.failUnless(results[0]) + + + def test_read(self): + self.write_test_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + # Check that every method equals what we expect it to. + d = defer.succeed(None) + def _check_block_and_salt((block, salt)): + self.failUnlessEqual(block, self.block) + self.failUnlessEqual(salt, self.salt) + + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mr.get_block_and_salt(i)) + d.addCallback(_check_block_and_salt) + + d.addCallback(lambda ignored: + mr.get_encprivkey()) + d.addCallback(lambda encprivkey: + self.failUnlessEqual(self.encprivkey, encprivkey)) + + d.addCallback(lambda ignored: + mr.get_blockhashes()) + d.addCallback(lambda blockhashes: + self.failUnlessEqual(self.block_hash_tree, blockhashes)) + + d.addCallback(lambda ignored: + mr.get_sharehashes()) + d.addCallback(lambda sharehashes: + self.failUnlessEqual(self.share_hash_chain, sharehashes)) + + d.addCallback(lambda ignored: + mr.get_signature()) + d.addCallback(lambda signature: + self.failUnlessEqual(signature, self.signature)) + + d.addCallback(lambda ignored: + mr.get_verification_key()) + d.addCallback(lambda verification_key: + self.failUnlessEqual(verification_key, self.verification_key)) + + d.addCallback(lambda ignored: + mr.get_seqnum()) + d.addCallback(lambda seqnum: + self.failUnlessEqual(seqnum, 0)) + + d.addCallback(lambda ignored: + mr.get_root_hash()) + d.addCallback(lambda root_hash: + self.failUnlessEqual(self.root_hash, root_hash)) + + d.addCallback(lambda ignored: + mr.get_seqnum()) + d.addCallback(lambda seqnum: + self.failUnlessEqual(0, seqnum)) + + d.addCallback(lambda ignored: + mr.get_encoding_parameters()) + def _check_encoding_parameters((k, n, segsize, datalen)): + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + self.failUnlessEqual(segsize, 6) + self.failUnlessEqual(datalen, 36) + d.addCallback(_check_encoding_parameters) + + d.addCallback(lambda ignored: + mr.get_checkstring()) + d.addCallback(lambda checkstring: + self.failUnlessEqual(checkstring, checkstring)) + return d + + + def test_read_with_different_tail_segment_size(self): + self.write_test_share_to_server("si1", tail_segment=True) + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = mr.get_block_and_salt(5) + def _check_tail_segment(results): + block, salt = results + self.failUnlessEqual(len(block), 1) + self.failUnlessEqual(block, "a") + d.addCallback(_check_tail_segment) + return d + + + def test_get_block_with_invalid_segnum(self): + self.write_test_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = defer.succeed(None) + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "test invalid segnum", + None, + mr.get_block_and_salt, 7)) + return d + + + def test_get_encoding_parameters_first(self): + self.write_test_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = mr.get_encoding_parameters() + def _check_encoding_parameters((k, n, segment_size, datalen)): + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + self.failUnlessEqual(segment_size, 6) + self.failUnlessEqual(datalen, 36) + d.addCallback(_check_encoding_parameters) + return d + + + def test_get_seqnum_first(self): + self.write_test_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = mr.get_seqnum() + d.addCallback(lambda seqnum: + self.failUnlessEqual(seqnum, 0)) + return d + + + def test_get_root_hash_first(self): + self.write_test_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = mr.get_root_hash() + d.addCallback(lambda root_hash: + self.failUnlessEqual(root_hash, self.root_hash)) + return d + + + def test_get_checkstring_first(self): + self.write_test_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = mr.get_checkstring() + d.addCallback(lambda checkstring: + self.failUnlessEqual(checkstring, self.checkstring)) + return d + + + def test_write_read_vectors(self): + # When writing for us, the storage server will return to us a + # read vector, along with its result. If a write fails because + # the test vectors failed, this read vector can help us to + # diagnose the problem. This test ensures that the read vector + # is working appropriately. + mw = self._make_new_mw("si1", 0) + + for i in xrange(6): + mw.put_block(self.block, i, self.salt) + mw.put_encprivkey(self.encprivkey) + mw.put_blockhashes(self.block_hash_tree) + mw.put_sharehashes(self.share_hash_chain) + mw.put_root_hash(self.root_hash) + mw.put_signature(self.signature) + mw.put_verification_key(self.verification_key) + d = mw.finish_publishing() + def _then(results): + self.failUnless(len(results), 2) + result, readv = results + self.failUnless(result) + self.failIf(readv) + self.old_checkstring = mw.get_checkstring() + mw.set_checkstring("") + d.addCallback(_then) + d.addCallback(lambda ignored: + mw.finish_publishing()) + def _then_again(results): + self.failUnlessEqual(len(results), 2) + result, readvs = results + self.failIf(result) + self.failUnlessIn(0, readvs) + readv = readvs[0][0] + self.failUnlessEqual(readv, self.old_checkstring) + d.addCallback(_then_again) + # The checkstring remains the same for the rest of the process. + return d + + + def test_private_key_after_share_hash_chain(self): + mw = self._make_new_mw("si1", 0) + d = defer.succeed(None) + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mw.put_block(self.block, i, self.salt)) + d.addCallback(lambda ignored: + mw.put_encprivkey(self.encprivkey)) + d.addCallback(lambda ignored: + mw.put_sharehashes(self.share_hash_chain)) + + # Now try to put the private key again. + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "test repeat private key", + None, + mw.put_encprivkey, self.encprivkey)) + return d + + + def test_signature_after_verification_key(self): + mw = self._make_new_mw("si1", 0) + d = defer.succeed(None) + # Put everything up to and including the verification key. + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mw.put_block(self.block, i, self.salt)) + d.addCallback(lambda ignored: + mw.put_encprivkey(self.encprivkey)) + d.addCallback(lambda ignored: + mw.put_blockhashes(self.block_hash_tree)) + d.addCallback(lambda ignored: + mw.put_sharehashes(self.share_hash_chain)) + d.addCallback(lambda ignored: + mw.put_root_hash(self.root_hash)) + d.addCallback(lambda ignored: + mw.put_signature(self.signature)) + d.addCallback(lambda ignored: + mw.put_verification_key(self.verification_key)) + # Now try to put the signature again. This should fail + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "signature after verification", + None, + mw.put_signature, self.signature)) + return d + + + def test_uncoordinated_write(self): + # Make two mutable writers, both pointing to the same storage + # server, both at the same storage index, and try writing to the + # same share. + mw1 = self._make_new_mw("si1", 0) + mw2 = self._make_new_mw("si1", 0) + + def _check_success(results): + result, readvs = results + self.failUnless(result) + + def _check_failure(results): + result, readvs = results + self.failIf(result) + + def _write_share(mw): + for i in xrange(6): + mw.put_block(self.block, i, self.salt) + mw.put_encprivkey(self.encprivkey) + mw.put_blockhashes(self.block_hash_tree) + mw.put_sharehashes(self.share_hash_chain) + mw.put_root_hash(self.root_hash) + mw.put_signature(self.signature) + mw.put_verification_key(self.verification_key) + return mw.finish_publishing() + d = _write_share(mw1) + d.addCallback(_check_success) + d.addCallback(lambda ignored: + _write_share(mw2)) + d.addCallback(_check_failure) + return d + + + def test_invalid_salt_size(self): + # Salts need to be 16 bytes in size. Writes that attempt to + # write more or less than this should be rejected. + mw = self._make_new_mw("si1", 0) + invalid_salt = "a" * 17 # 17 bytes + another_invalid_salt = "b" * 15 # 15 bytes + d = defer.succeed(None) + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "salt too big", + None, + mw.put_block, self.block, 0, invalid_salt)) + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "salt too small", + None, + mw.put_block, self.block, 0, + another_invalid_salt)) + return d + + + def test_write_test_vectors(self): + # If we give the write proxy a bogus test vector at + # any point during the process, it should fail to write when we + # tell it to write. + def _check_failure(results): + self.failUnlessEqual(len(results), 2) + res, d = results + self.failIf(res) + + def _check_success(results): + self.failUnlessEqual(len(results), 2) + res, d = results + self.failUnless(results) + + mw = self._make_new_mw("si1", 0) + mw.set_checkstring("this is a lie") + for i in xrange(6): + mw.put_block(self.block, i, self.salt) + mw.put_encprivkey(self.encprivkey) + mw.put_blockhashes(self.block_hash_tree) + mw.put_sharehashes(self.share_hash_chain) + mw.put_root_hash(self.root_hash) + mw.put_signature(self.signature) + mw.put_verification_key(self.verification_key) + d = mw.finish_publishing() + d.addCallback(_check_failure) + d.addCallback(lambda ignored: + mw.set_checkstring("")) + d.addCallback(lambda ignored: + mw.finish_publishing()) + d.addCallback(_check_success) + return d + + + def serialize_blockhashes(self, blockhashes): + return "".join(blockhashes) + + + def serialize_sharehashes(self, sharehashes): + ret = "".join([struct.pack(">H32s", i, sharehashes[i]) + for i in sorted(sharehashes.keys())]) + return ret + + + def test_write(self): + # This translates to a file with 6 6-byte segments, and with 2-byte + # blocks. + mw = self._make_new_mw("si1", 0) + # Test writing some blocks. + read = self.ss.remote_slot_readv + expected_private_key_offset = struct.calcsize(MDMFHEADER) + expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \ + PRIVATE_KEY_SIZE + \ + SIGNATURE_SIZE + \ + VERIFICATION_KEY_SIZE + \ + SHARE_HASH_CHAIN_SIZE + written_block_size = 2 + len(self.salt) + written_block = self.block + self.salt + for i in xrange(6): + mw.put_block(self.block, i, self.salt) + + mw.put_encprivkey(self.encprivkey) + mw.put_blockhashes(self.block_hash_tree) + mw.put_sharehashes(self.share_hash_chain) + mw.put_root_hash(self.root_hash) + mw.put_signature(self.signature) + mw.put_verification_key(self.verification_key) + d = mw.finish_publishing() + def _check_publish(results): + self.failUnlessEqual(len(results), 2) + result, ign = results + self.failUnless(result, "publish failed") + for i in xrange(6): + self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]), + {0: [written_block]}) + + self.failUnlessEqual(len(self.encprivkey), 7) + self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]), + {0: [self.encprivkey]}) + + expected_block_hash_offset = expected_sharedata_offset + \ + (6 * written_block_size) + self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6) + self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]), + {0: [self.block_hash_tree_s]}) + + expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey) + self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]), + {0: [self.share_hash_chain_s]}) + + self.failUnlessEqual(read("si1", [0], [(9, 32)]), + {0: [self.root_hash]}) + expected_signature_offset = expected_share_hash_offset + \ + len(self.share_hash_chain_s) + self.failUnlessEqual(len(self.signature), 9) + self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]), + {0: [self.signature]}) + + expected_verification_key_offset = expected_signature_offset + len(self.signature) + self.failUnlessEqual(len(self.verification_key), 6) + self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]), + {0: [self.verification_key]}) + + signable = mw.get_signable() + verno, seq, roothash, k, n, segsize, datalen = \ + struct.unpack(">BQ32sBBQQ", + signable) + self.failUnlessEqual(verno, 1) + self.failUnlessEqual(seq, 0) + self.failUnlessEqual(roothash, self.root_hash) + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + self.failUnlessEqual(segsize, 6) + self.failUnlessEqual(datalen, 36) + expected_eof_offset = expected_block_hash_offset + \ + len(self.block_hash_tree_s) + + # Check the version number to make sure that it is correct. + expected_version_number = struct.pack(">B", 1) + self.failUnlessEqual(read("si1", [0], [(0, 1)]), + {0: [expected_version_number]}) + # Check the sequence number to make sure that it is correct + expected_sequence_number = struct.pack(">Q", 0) + self.failUnlessEqual(read("si1", [0], [(1, 8)]), + {0: [expected_sequence_number]}) + # Check that the encoding parameters (k, N, segement size, data + # length) are what they should be. These are 3, 10, 6, 36 + expected_k = struct.pack(">B", 3) + self.failUnlessEqual(read("si1", [0], [(41, 1)]), + {0: [expected_k]}) + expected_n = struct.pack(">B", 10) + self.failUnlessEqual(read("si1", [0], [(42, 1)]), + {0: [expected_n]}) + expected_segment_size = struct.pack(">Q", 6) + self.failUnlessEqual(read("si1", [0], [(43, 8)]), + {0: [expected_segment_size]}) + expected_data_length = struct.pack(">Q", 36) + self.failUnlessEqual(read("si1", [0], [(51, 8)]), + {0: [expected_data_length]}) + expected_offset = struct.pack(">Q", expected_private_key_offset) + self.failUnlessEqual(read("si1", [0], [(59, 8)]), + {0: [expected_offset]}) + expected_offset = struct.pack(">Q", expected_share_hash_offset) + self.failUnlessEqual(read("si1", [0], [(67, 8)]), + {0: [expected_offset]}) + expected_offset = struct.pack(">Q", expected_signature_offset) + self.failUnlessEqual(read("si1", [0], [(75, 8)]), + {0: [expected_offset]}) + expected_offset = struct.pack(">Q", expected_verification_key_offset) + self.failUnlessEqual(read("si1", [0], [(83, 8)]), + {0: [expected_offset]}) + expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key)) + self.failUnlessEqual(read("si1", [0], [(91, 8)]), + {0: [expected_offset]}) + expected_offset = struct.pack(">Q", expected_sharedata_offset) + self.failUnlessEqual(read("si1", [0], [(99, 8)]), + {0: [expected_offset]}) + expected_offset = struct.pack(">Q", expected_block_hash_offset) + self.failUnlessEqual(read("si1", [0], [(107, 8)]), + {0: [expected_offset]}) + expected_offset = struct.pack(">Q", expected_eof_offset) + self.failUnlessEqual(read("si1", [0], [(115, 8)]), + {0: [expected_offset]}) + d.addCallback(_check_publish) + return d + + def _make_new_mw(self, si, share, datalength=36): + # This is a file of size 36 bytes. Since it has a segment + # size of 6, we know that it has 6 byte segments, which will + # be split into blocks of 2 bytes because our FEC k + # parameter is 3. + mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10, + 6, datalength) + return mw + + + def test_write_rejected_with_too_many_blocks(self): + mw = self._make_new_mw("si0", 0) + + # Try writing too many blocks. We should not be able to write + # more than 6 + # blocks into each share. + d = defer.succeed(None) + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mw.put_block(self.block, i, self.salt)) + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "too many blocks", + None, + mw.put_block, self.block, 7, self.salt)) + return d + + + def test_write_rejected_with_invalid_salt(self): + # Try writing an invalid salt. Salts are 16 bytes -- any more or + # less should cause an error. + mw = self._make_new_mw("si1", 0) + bad_salt = "a" * 17 # 17 bytes + d = defer.succeed(None) + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "test_invalid_salt", + None, mw.put_block, self.block, 7, bad_salt)) + return d + + + def test_write_rejected_with_invalid_root_hash(self): + # Try writing an invalid root hash. This should be SHA256d, and + # 32 bytes long as a result. + mw = self._make_new_mw("si2", 0) + # 17 bytes != 32 bytes + invalid_root_hash = "a" * 17 + d = defer.succeed(None) + # Before this test can work, we need to put some blocks + salts, + # a block hash tree, and a share hash tree. Otherwise, we'll see + # failures that match what we are looking for, but are caused by + # the constraints imposed on operation ordering. + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mw.put_block(self.block, i, self.salt)) + d.addCallback(lambda ignored: + mw.put_encprivkey(self.encprivkey)) + d.addCallback(lambda ignored: + mw.put_blockhashes(self.block_hash_tree)) + d.addCallback(lambda ignored: + mw.put_sharehashes(self.share_hash_chain)) + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "invalid root hash", + None, mw.put_root_hash, invalid_root_hash)) + return d + + + def test_write_rejected_with_invalid_blocksize(self): + # The blocksize implied by the writer that we get from + # _make_new_mw is 2bytes -- any more or any less than this + # should be cause for failure, unless it is the tail segment, in + # which case it may not be failure. + invalid_block = "a" + mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with + # one byte blocks + # 1 bytes != 2 bytes + d = defer.succeed(None) + d.addCallback(lambda ignored, invalid_block=invalid_block: + self.shouldFail(LayoutInvalid, "test blocksize too small", + None, mw.put_block, invalid_block, 0, + self.salt)) + invalid_block = invalid_block * 3 + # 3 bytes != 2 bytes + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "test blocksize too large", + None, + mw.put_block, invalid_block, 0, self.salt)) + for i in xrange(5): + d.addCallback(lambda ignored, i=i: + mw.put_block(self.block, i, self.salt)) + # Try to put an invalid tail segment + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "test invalid tail segment", + None, + mw.put_block, self.block, 5, self.salt)) + valid_block = "a" + d.addCallback(lambda ignored: + mw.put_block(valid_block, 5, self.salt)) + return d + + + def test_write_enforces_order_constraints(self): + # We require that the MDMFSlotWriteProxy be interacted with in a + # specific way. + # That way is: + # 0: __init__ + # 1: write blocks and salts + # 2: Write the encrypted private key + # 3: Write the block hashes + # 4: Write the share hashes + # 5: Write the root hash and salt hash + # 6: Write the signature and verification key + # 7: Write the file. + # + # Some of these can be performed out-of-order, and some can't. + # The dependencies that I want to test here are: + # - Private key before block hashes + # - share hashes and block hashes before root hash + # - root hash before signature + # - signature before verification key + mw0 = self._make_new_mw("si0", 0) + # Write some shares + d = defer.succeed(None) + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mw0.put_block(self.block, i, self.salt)) + + # Try to write the share hash chain without writing the + # encrypted private key + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "share hash chain before " + "private key", + None, + mw0.put_sharehashes, self.share_hash_chain)) + # Write the private key. + d.addCallback(lambda ignored: + mw0.put_encprivkey(self.encprivkey)) + + # Now write the block hashes and try again + d.addCallback(lambda ignored: + mw0.put_blockhashes(self.block_hash_tree)) + + # We haven't yet put the root hash on the share, so we shouldn't + # be able to sign it. + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "signature before root hash", + None, mw0.put_signature, self.signature)) + + d.addCallback(lambda ignored: + self.failUnlessRaises(LayoutInvalid, mw0.get_signable)) + + # ..and, since that fails, we also shouldn't be able to put the + # verification key. + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "key before signature", + None, mw0.put_verification_key, + self.verification_key)) + + # Now write the share hashes. + d.addCallback(lambda ignored: + mw0.put_sharehashes(self.share_hash_chain)) + # We should be able to write the root hash now too + d.addCallback(lambda ignored: + mw0.put_root_hash(self.root_hash)) + + # We should still be unable to put the verification key + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "key before signature", + None, mw0.put_verification_key, + self.verification_key)) + + d.addCallback(lambda ignored: + mw0.put_signature(self.signature)) + + # We shouldn't be able to write the offsets to the remote server + # until the offset table is finished; IOW, until we have written + # the verification key. + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "offsets before verification key", + None, + mw0.finish_publishing)) + + d.addCallback(lambda ignored: + mw0.put_verification_key(self.verification_key)) + return d + + + def test_end_to_end(self): + mw = self._make_new_mw("si1", 0) + # Write a share using the mutable writer, and make sure that the + # reader knows how to read everything back to us. + d = defer.succeed(None) + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mw.put_block(self.block, i, self.salt)) + d.addCallback(lambda ignored: + mw.put_encprivkey(self.encprivkey)) + d.addCallback(lambda ignored: + mw.put_blockhashes(self.block_hash_tree)) + d.addCallback(lambda ignored: + mw.put_sharehashes(self.share_hash_chain)) + d.addCallback(lambda ignored: + mw.put_root_hash(self.root_hash)) + d.addCallback(lambda ignored: + mw.put_signature(self.signature)) + d.addCallback(lambda ignored: + mw.put_verification_key(self.verification_key)) + d.addCallback(lambda ignored: + mw.finish_publishing()) + + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + def _check_block_and_salt((block, salt)): + self.failUnlessEqual(block, self.block) + self.failUnlessEqual(salt, self.salt) + + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mr.get_block_and_salt(i)) + d.addCallback(_check_block_and_salt) + + d.addCallback(lambda ignored: + mr.get_encprivkey()) + d.addCallback(lambda encprivkey: + self.failUnlessEqual(self.encprivkey, encprivkey)) + + d.addCallback(lambda ignored: + mr.get_blockhashes()) + d.addCallback(lambda blockhashes: + self.failUnlessEqual(self.block_hash_tree, blockhashes)) + + d.addCallback(lambda ignored: + mr.get_sharehashes()) + d.addCallback(lambda sharehashes: + self.failUnlessEqual(self.share_hash_chain, sharehashes)) + + d.addCallback(lambda ignored: + mr.get_signature()) + d.addCallback(lambda signature: + self.failUnlessEqual(signature, self.signature)) + + d.addCallback(lambda ignored: + mr.get_verification_key()) + d.addCallback(lambda verification_key: + self.failUnlessEqual(verification_key, self.verification_key)) + + d.addCallback(lambda ignored: + mr.get_seqnum()) + d.addCallback(lambda seqnum: + self.failUnlessEqual(seqnum, 0)) + + d.addCallback(lambda ignored: + mr.get_root_hash()) + d.addCallback(lambda root_hash: + self.failUnlessEqual(self.root_hash, root_hash)) + + d.addCallback(lambda ignored: + mr.get_encoding_parameters()) + def _check_encoding_parameters((k, n, segsize, datalen)): + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + self.failUnlessEqual(segsize, 6) + self.failUnlessEqual(datalen, 36) + d.addCallback(_check_encoding_parameters) + + d.addCallback(lambda ignored: + mr.get_checkstring()) + d.addCallback(lambda checkstring: + self.failUnlessEqual(checkstring, mw.get_checkstring())) + return d + + + def test_is_sdmf(self): + # The MDMFSlotReadProxy should also know how to read SDMF files, + # since it will encounter them on the grid. Callers use the + # is_sdmf method to test this. + self.write_sdmf_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = mr.is_sdmf() + d.addCallback(lambda issdmf: + self.failUnless(issdmf)) + return d + + + def test_reads_sdmf(self): + # The slot read proxy should, naturally, know how to tell us + # about data in the SDMF format + self.write_sdmf_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = defer.succeed(None) + d.addCallback(lambda ignored: + mr.is_sdmf()) + d.addCallback(lambda issdmf: + self.failUnless(issdmf)) + + # What do we need to read? + # - The sharedata + # - The salt + d.addCallback(lambda ignored: + mr.get_block_and_salt(0)) + def _check_block_and_salt(results): + block, salt = results + # Our original file is 36 bytes long. Then each share is 12 + # bytes in size. The share is composed entirely of the + # letter a. self.block contains 2 as, so 6 * self.block is + # what we are looking for. + self.failUnlessEqual(block, self.block * 6) + self.failUnlessEqual(salt, self.salt) + d.addCallback(_check_block_and_salt) + + # - The blockhashes + d.addCallback(lambda ignored: + mr.get_blockhashes()) + d.addCallback(lambda blockhashes: + self.failUnlessEqual(self.block_hash_tree, + blockhashes, + blockhashes)) + # - The sharehashes + d.addCallback(lambda ignored: + mr.get_sharehashes()) + d.addCallback(lambda sharehashes: + self.failUnlessEqual(self.share_hash_chain, + sharehashes)) + # - The keys + d.addCallback(lambda ignored: + mr.get_encprivkey()) + d.addCallback(lambda encprivkey: + self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey)) + d.addCallback(lambda ignored: + mr.get_verification_key()) + d.addCallback(lambda verification_key: + self.failUnlessEqual(verification_key, + self.verification_key, + verification_key)) + # - The signature + d.addCallback(lambda ignored: + mr.get_signature()) + d.addCallback(lambda signature: + self.failUnlessEqual(signature, self.signature, signature)) + + # - The sequence number + d.addCallback(lambda ignored: + mr.get_seqnum()) + d.addCallback(lambda seqnum: + self.failUnlessEqual(seqnum, 0, seqnum)) + + # - The root hash + d.addCallback(lambda ignored: + mr.get_root_hash()) + d.addCallback(lambda root_hash: + self.failUnlessEqual(root_hash, self.root_hash, root_hash)) + return d + + + def test_only_reads_one_segment_sdmf(self): + # SDMF shares have only one segment, so it doesn't make sense to + # read more segments than that. The reader should know this and + # complain if we try to do that. + self.write_sdmf_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = defer.succeed(None) + d.addCallback(lambda ignored: + mr.is_sdmf()) + d.addCallback(lambda issdmf: + self.failUnless(issdmf)) + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "test bad segment", + None, + mr.get_block_and_salt, 1)) + return d + + + def test_read_with_prefetched_mdmf_data(self): + # The MDMFSlotReadProxy will prefill certain fields if you pass + # it data that you have already fetched. This is useful for + # cases like the Servermap, which prefetches ~2kb of data while + # finding out which shares are on the remote peer so that it + # doesn't waste round trips. + mdmf_data = self.build_test_mdmf_share() + self.write_test_share_to_server("si1") + def _make_mr(ignored, length): + mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length]) + return mr + + d = defer.succeed(None) + # This should be enough to fill in both the encoding parameters + # and the table of offsets, which will complete the version + # information tuple. + d.addCallback(_make_mr, 123) + d.addCallback(lambda mr: + mr.get_verinfo()) + def _check_verinfo(verinfo): + self.failUnless(verinfo) + self.failUnlessEqual(len(verinfo), 9) + (seqnum, + root_hash, + salt_hash, + segsize, + datalen, + k, + n, + prefix, + offsets) = verinfo + self.failUnlessEqual(seqnum, 0) + self.failUnlessEqual(root_hash, self.root_hash) + self.failUnlessEqual(segsize, 6) + self.failUnlessEqual(datalen, 36) + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + expected_prefix = struct.pack(MDMFSIGNABLEHEADER, + 1, + seqnum, + root_hash, + k, + n, + segsize, + datalen) + self.failUnlessEqual(expected_prefix, prefix) + self.failUnlessEqual(self.rref.read_count, 0) + d.addCallback(_check_verinfo) + # This is not enough data to read a block and a share, so the + # wrapper should attempt to read this from the remote server. + d.addCallback(_make_mr, 123) + d.addCallback(lambda mr: + mr.get_block_and_salt(0)) + def _check_block_and_salt((block, salt)): + self.failUnlessEqual(block, self.block) + self.failUnlessEqual(salt, self.salt) + self.failUnlessEqual(self.rref.read_count, 1) + # This should be enough data to read one block. + d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140) + d.addCallback(lambda mr: + mr.get_block_and_salt(0)) + d.addCallback(_check_block_and_salt) + return d + + + def test_read_with_prefetched_sdmf_data(self): + sdmf_data = self.build_test_sdmf_share() + self.write_sdmf_share_to_server("si1") + def _make_mr(ignored, length): + mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length]) + return mr + + d = defer.succeed(None) + # This should be enough to get us the encoding parameters, + # offset table, and everything else we need to build a verinfo + # string. + d.addCallback(_make_mr, 123) + d.addCallback(lambda mr: + mr.get_verinfo()) + def _check_verinfo(verinfo): + self.failUnless(verinfo) + self.failUnlessEqual(len(verinfo), 9) + (seqnum, + root_hash, + salt, + segsize, + datalen, + k, + n, + prefix, + offsets) = verinfo + self.failUnlessEqual(seqnum, 0) + self.failUnlessEqual(root_hash, self.root_hash) + self.failUnlessEqual(salt, self.salt) + self.failUnlessEqual(segsize, 36) + self.failUnlessEqual(datalen, 36) + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + expected_prefix = struct.pack(SIGNED_PREFIX, + 0, + seqnum, + root_hash, + salt, + k, + n, + segsize, + datalen) + self.failUnlessEqual(expected_prefix, prefix) + self.failUnlessEqual(self.rref.read_count, 0) + d.addCallback(_check_verinfo) + # This shouldn't be enough to read any share data. + d.addCallback(_make_mr, 123) + d.addCallback(lambda mr: + mr.get_block_and_salt(0)) + def _check_block_and_salt((block, salt)): + self.failUnlessEqual(block, self.block * 6) + self.failUnlessEqual(salt, self.salt) + # TODO: Fix the read routine so that it reads only the data + # that it has cached if it can't read all of it. + self.failUnlessEqual(self.rref.read_count, 2) + + # This should be enough to read share data. + d.addCallback(_make_mr, self.offsets['share_data']) + d.addCallback(lambda mr: + mr.get_block_and_salt(0)) + d.addCallback(_check_block_and_salt) + return d + + + def test_read_with_empty_mdmf_file(self): + # Some tests upload a file with no contents to test things + # unrelated to the actual handling of the content of the file. + # The reader should behave intelligently in these cases. + self.write_test_share_to_server("si1", empty=True) + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + # We should be able to get the encoding parameters, and they + # should be correct. + d = defer.succeed(None) + d.addCallback(lambda ignored: + mr.get_encoding_parameters()) + def _check_encoding_parameters(params): + self.failUnlessEqual(len(params), 4) + k, n, segsize, datalen = params + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + self.failUnlessEqual(segsize, 0) + self.failUnlessEqual(datalen, 0) + d.addCallback(_check_encoding_parameters) + + # We should not be able to fetch a block, since there are no + # blocks to fetch + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "get block on empty file", + None, + mr.get_block_and_salt, 0)) + return d + + + def test_read_with_empty_sdmf_file(self): + self.write_sdmf_share_to_server("si1", empty=True) + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + # We should be able to get the encoding parameters, and they + # should be correct + d = defer.succeed(None) + d.addCallback(lambda ignored: + mr.get_encoding_parameters()) + def _check_encoding_parameters(params): + self.failUnlessEqual(len(params), 4) + k, n, segsize, datalen = params + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + self.failUnlessEqual(segsize, 0) + self.failUnlessEqual(datalen, 0) + d.addCallback(_check_encoding_parameters) + + # It does not make sense to get a block in this format, so we + # should not be able to. + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "get block on an empty file", + None, + mr.get_block_and_salt, 0)) + return d + + + def test_verinfo_with_sdmf_file(self): + self.write_sdmf_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + # We should be able to get the version information. + d = defer.succeed(None) + d.addCallback(lambda ignored: + mr.get_verinfo()) + def _check_verinfo(verinfo): + self.failUnless(verinfo) + self.failUnlessEqual(len(verinfo), 9) + (seqnum, + root_hash, + salt, + segsize, + datalen, + k, + n, + prefix, + offsets) = verinfo + self.failUnlessEqual(seqnum, 0) + self.failUnlessEqual(root_hash, self.root_hash) + self.failUnlessEqual(salt, self.salt) + self.failUnlessEqual(segsize, 36) + self.failUnlessEqual(datalen, 36) + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + expected_prefix = struct.pack(">BQ32s16s BBQQ", + 0, + seqnum, + root_hash, + salt, + k, + n, + segsize, + datalen) + self.failUnlessEqual(prefix, expected_prefix) + self.failUnlessEqual(offsets, self.offsets) + d.addCallback(_check_verinfo) + return d + + + def test_verinfo_with_mdmf_file(self): + self.write_test_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = defer.succeed(None) + d.addCallback(lambda ignored: + mr.get_verinfo()) + def _check_verinfo(verinfo): + self.failUnless(verinfo) + self.failUnlessEqual(len(verinfo), 9) + (seqnum, + root_hash, + IV, + segsize, + datalen, + k, + n, + prefix, + offsets) = verinfo + self.failUnlessEqual(seqnum, 0) + self.failUnlessEqual(root_hash, self.root_hash) + self.failIf(IV) + self.failUnlessEqual(segsize, 6) + self.failUnlessEqual(datalen, 36) + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + expected_prefix = struct.pack(">BQ32s BBQQ", + 1, + seqnum, + root_hash, + k, + n, + segsize, + datalen) + self.failUnlessEqual(prefix, expected_prefix) + self.failUnlessEqual(offsets, self.offsets) + d.addCallback(_check_verinfo) + return d + + + def test_reader_queue(self): + self.write_test_share_to_server('si1') + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d1 = mr.get_block_and_salt(0, queue=True) + d2 = mr.get_blockhashes(queue=True) + d3 = mr.get_sharehashes(queue=True) + d4 = mr.get_signature(queue=True) + d5 = mr.get_verification_key(queue=True) + dl = defer.DeferredList([d1, d2, d3, d4, d5]) + mr.flush() + def _print(results): + self.failUnlessEqual(len(results), 5) + # We have one read for version information and offsets, and + # one for everything else. + self.failUnlessEqual(self.rref.read_count, 2) + block, salt = results[0][1] # results[0] is a boolean that says + # whether or not the operation + # worked. + self.failUnlessEqual(self.block, block) + self.failUnlessEqual(self.salt, salt) + + blockhashes = results[1][1] + self.failUnlessEqual(self.block_hash_tree, blockhashes) + + sharehashes = results[2][1] + self.failUnlessEqual(self.share_hash_chain, sharehashes) + + signature = results[3][1] + self.failUnlessEqual(self.signature, signature) + + verification_key = results[4][1] + self.failUnlessEqual(self.verification_key, verification_key) + dl.addCallback(_print) + return dl + + + def test_sdmf_writer(self): + # Go through the motions of writing an SDMF share to the storage + # server. Then read the storage server to see that the share got + # written in the way that we think it should have. + + # We do this first so that the necessary instance variables get + # set the way we want them for the tests below. + data = self.build_test_sdmf_share() + sdmfr = SDMFSlotWriteProxy(0, + self.rref, + "si1", + self.secrets, + 0, 3, 10, 36, 36) + # Put the block and salt. + sdmfr.put_block(self.blockdata, 0, self.salt) + + # Put the encprivkey + sdmfr.put_encprivkey(self.encprivkey) + + # Put the block and share hash chains + sdmfr.put_blockhashes(self.block_hash_tree) + sdmfr.put_sharehashes(self.share_hash_chain) + sdmfr.put_root_hash(self.root_hash) + + # Put the signature + sdmfr.put_signature(self.signature) + + # Put the verification key + sdmfr.put_verification_key(self.verification_key) + + # Now check to make sure that nothing has been written yet. + self.failUnlessEqual(self.rref.write_count, 0) + + # Now finish publishing + d = sdmfr.finish_publishing() + def _then(ignored): + self.failUnlessEqual(self.rref.write_count, 1) + read = self.ss.remote_slot_readv + self.failUnlessEqual(read("si1", [0], [(0, len(data))]), + {0: [data]}) + d.addCallback(_then) + return d + + + def test_sdmf_writer_preexisting_share(self): + data = self.build_test_sdmf_share() + self.write_sdmf_share_to_server("si1") + + # Now there is a share on the storage server. To successfully + # write, we need to set the checkstring correctly. When we + # don't, no write should occur. + sdmfw = SDMFSlotWriteProxy(0, + self.rref, + "si1", + self.secrets, + 1, 3, 10, 36, 36) + sdmfw.put_block(self.blockdata, 0, self.salt) + + # Put the encprivkey + sdmfw.put_encprivkey(self.encprivkey) + + # Put the block and share hash chains + sdmfw.put_blockhashes(self.block_hash_tree) + sdmfw.put_sharehashes(self.share_hash_chain) + + # Put the root hash + sdmfw.put_root_hash(self.root_hash) + + # Put the signature + sdmfw.put_signature(self.signature) + + # Put the verification key + sdmfw.put_verification_key(self.verification_key) + + # We shouldn't have a checkstring yet + self.failUnlessEqual(sdmfw.get_checkstring(), "") + + d = sdmfw.finish_publishing() + def _then(results): + self.failIf(results[0]) + # this is the correct checkstring + self._expected_checkstring = results[1][0][0] + return self._expected_checkstring + + d.addCallback(_then) + d.addCallback(sdmfw.set_checkstring) + d.addCallback(lambda ignored: + sdmfw.get_checkstring()) + d.addCallback(lambda checkstring: + self.failUnlessEqual(checkstring, self._expected_checkstring)) + d.addCallback(lambda ignored: + sdmfw.finish_publishing()) + def _then_again(results): + self.failUnless(results[0]) + read = self.ss.remote_slot_readv + self.failUnlessEqual(read("si1", [0], [(1, 8)]), + {0: [struct.pack(">Q", 1)]}) + self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]), + {0: [data[9:]]}) + d.addCallback(_then_again) + return d + + class Stats(unittest.TestCase): def setUp(self):