vdrive: protect dirnode contents with an HMAC

This commit is contained in:
Brian Warner 2007-06-26 12:36:21 -07:00
parent 4a0682874b
commit bc2603c818
3 changed files with 57 additions and 9 deletions

View File

@ -255,3 +255,35 @@ class Test(unittest.TestCase):
sorted(expected_keys))
return res
def flip_bit(data, offset):
if offset < 0:
offset = len(data) + offset
return data[:offset] + chr(ord(data[offset]) ^ 0x01) + data[offset+1:]
class Encryption(unittest.TestCase):
def test_loopback(self):
key = "k" * 16
data = "This is some plaintext data."
crypttext = vdrive.encrypt(key, data)
plaintext = vdrive.decrypt(key, crypttext)
self.failUnlessEqual(data, plaintext)
def test_hmac(self):
key = "j" * 16
data = "This is some more plaintext data."
crypttext = vdrive.encrypt(key, data)
# flip a bit in the IV
self.failUnlessRaises(vdrive.IntegrityCheckError,
vdrive.decrypt,
key, flip_bit(crypttext, 0))
# flip a bit in the crypttext
self.failUnlessRaises(vdrive.IntegrityCheckError,
vdrive.decrypt,
key, flip_bit(crypttext, 16))
# flip a bit in the HMAC
self.failUnlessRaises(vdrive.IntegrityCheckError,
vdrive.decrypt,
key, flip_bit(crypttext, -1))
plaintext = vdrive.decrypt(key, crypttext)
self.failUnlessEqual(data, plaintext)

View File

@ -79,3 +79,13 @@ def generate_dirnode_keys_from_writekey(write_key):
def generate_dirnode_keys_from_readkey(read_key):
index = dir_index_hash(read_key)
return None, None, read_key, index
def _xor(a, b):
return "".join([chr(ord(c) ^ ord(b)) for c in a])
def hmac(tag, data):
ikey = _xor(tag, "\x36")
okey = _xor(tag, "\x5c")
h1 = SHA256.new(ikey + data).digest()
h2 = SHA256.new(okey + h1).digest()
return h2

View File

@ -27,23 +27,29 @@ def create_directory_node(client, diruri):
d.addCallback(_got)
return d
IV_LENGTH = 14
def encrypt(key, data):
# TODO: add the hmac
IV = os.urandom(14)
counterstart = IV + "\x00"*2
IV = os.urandom(IV_LENGTH)
counterstart = IV + "\x00"*(16-IV_LENGTH)
assert len(counterstart) == 16, len(counterstart)
cryptor = AES.new(key=key, mode=AES.MODE_CTR, counterstart=counterstart)
crypttext = cryptor.encrypt(data)
return IV + crypttext
mac = hashutil.hmac(key, IV + crypttext)
assert len(mac) == 32
return IV + crypttext + mac
class IntegrityCheckError(Exception):
pass
def decrypt(key, data):
# TODO: validate the hmac
assert len(data) >= 14, len(data)
IV = data[:14]
counterstart = IV + "\x00"*2
assert len(data) >= (32+IV_LENGTH), len(data)
IV, crypttext, mac = data[:IV_LENGTH], data[IV_LENGTH:-32], data[-32:]
if mac != hashutil.hmac(key, IV+crypttext):
raise IntegrityCheckError("HMAC does not match, crypttext is corrupted")
counterstart = IV + "\x00"*(16-IV_LENGTH)
assert len(counterstart) == 16, len(counterstart)
cryptor = AES.new(key=key, mode=AES.MODE_CTR, counterstart=counterstart)
plaintext = cryptor.decrypt(data[14:])
plaintext = cryptor.decrypt(crypttext)
return plaintext