Merge pull request #1152 from tahoe-lafs/3833.container-format-abstraction

Container header format abstractions

Fixes: ticket:3833
This commit is contained in:
Jean-Paul Calderone 2021-11-03 13:05:10 -04:00 committed by GitHub
commit 8fbbc913ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 188 additions and 156 deletions

0
newsfragments/3833.minor Normal file
View File

View File

@ -15,15 +15,22 @@ try:
except ImportError: except ImportError:
pass pass
# do not import any allmydata modules at this level. Do that from inside
# individual functions instead.
import struct, time, os, sys import struct, time, os, sys
from twisted.python import usage, failure from twisted.python import usage, failure
from twisted.internet import defer from twisted.internet import defer
from foolscap.logging import cli as foolscap_cli from foolscap.logging import cli as foolscap_cli
from allmydata.scripts.common import BaseOptions
from allmydata.scripts.common import BaseOptions
from allmydata import uri
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import ShareFile
from allmydata.mutable.layout import unpack_share
from allmydata.mutable.layout import MDMFSlotReadProxy
from allmydata.mutable.common import NeedMoreDataError
from allmydata.immutable.layout import ReadBucketProxy
from allmydata.util import base32
from allmydata.util.encodingutil import quote_output
class DumpOptions(BaseOptions): class DumpOptions(BaseOptions):
def getSynopsis(self): def getSynopsis(self):
@ -56,13 +63,11 @@ def dump_share(options):
# check the version, to see if we have a mutable or immutable share # check the version, to see if we have a mutable or immutable share
print("share filename: %s" % quote_output(options['filename']), file=out) print("share filename: %s" % quote_output(options['filename']), file=out)
f = open(options['filename'], "rb") with open(options['filename'], "rb") as f:
prefix = f.read(32) if MutableShareFile.is_valid_header(f.read(32)):
f.close() return dump_mutable_share(options)
if prefix == MutableShareFile.MAGIC: # otherwise assume it's immutable
return dump_mutable_share(options) return dump_immutable_share(options)
# otherwise assume it's immutable
return dump_immutable_share(options)
def dump_immutable_share(options): def dump_immutable_share(options):
from allmydata.storage.immutable import ShareFile from allmydata.storage.immutable import ShareFile
@ -712,125 +717,122 @@ def call(c, *args, **kwargs):
return results[0] return results[0]
def describe_share(abs_sharefile, si_s, shnum_s, now, out): def describe_share(abs_sharefile, si_s, shnum_s, now, out):
from allmydata import uri with open(abs_sharefile, "rb") as f:
from allmydata.storage.mutable import MutableShareFile prefix = f.read(32)
from allmydata.storage.immutable import ShareFile if MutableShareFile.is_valid_header(prefix):
from allmydata.mutable.layout import unpack_share _describe_mutable_share(abs_sharefile, f, now, si_s, out)
from allmydata.mutable.common import NeedMoreDataError elif ShareFile.is_valid_header(prefix):
from allmydata.immutable.layout import ReadBucketProxy _describe_immutable_share(abs_sharefile, now, si_s, out)
from allmydata.util import base32
from allmydata.util.encodingutil import quote_output
import struct
f = open(abs_sharefile, "rb")
prefix = f.read(32)
if prefix == MutableShareFile.MAGIC:
# mutable share
m = MutableShareFile(abs_sharefile)
WE, nodeid = m._read_write_enabler_and_nodeid(f)
data_length = m._read_data_length(f)
expiration_time = min( [lease.get_expiration_time()
for (i,lease) in m._enumerate_leases(f)] )
expiration = max(0, expiration_time - now)
share_type = "unknown"
f.seek(m.DATA_OFFSET)
version = f.read(1)
if version == b"\x00":
# this slot contains an SMDF share
share_type = "SDMF"
elif version == b"\x01":
share_type = "MDMF"
if share_type == "SDMF":
f.seek(m.DATA_OFFSET)
data = f.read(min(data_length, 2000))
try:
pieces = unpack_share(data)
except NeedMoreDataError as e:
# retry once with the larger size
size = e.needed_bytes
f.seek(m.DATA_OFFSET)
data = f.read(min(data_length, size))
pieces = unpack_share(data)
(seqnum, root_hash, IV, k, N, segsize, datalen,
pubkey, signature, share_hash_chain, block_hash_tree,
share_data, enc_privkey) = pieces
print("SDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
elif share_type == "MDMF":
from allmydata.mutable.layout import MDMFSlotReadProxy
fake_shnum = 0
# TODO: factor this out with dump_MDMF_share()
class ShareDumper(MDMFSlotReadProxy):
def _read(self, readvs, force_remote=False, queue=False):
data = []
for (where,length) in readvs:
f.seek(m.DATA_OFFSET+where)
data.append(f.read(length))
return defer.succeed({fake_shnum: data})
p = ShareDumper(None, "fake-si", fake_shnum)
def extract(func):
stash = []
# these methods return Deferreds, but we happen to know that
# they run synchronously when not actually talking to a
# remote server
d = func()
d.addCallback(stash.append)
return stash[0]
verinfo = extract(p.get_verinfo)
(seqnum, root_hash, salt_to_use, segsize, datalen, k, N, prefix,
offsets) = verinfo
print("MDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
else: else:
print("UNKNOWN mutable %s" % quote_output(abs_sharefile), file=out) print("UNKNOWN really-unknown %s" % quote_output(abs_sharefile), file=out)
elif struct.unpack(">L", prefix[:4]) == (1,): def _describe_mutable_share(abs_sharefile, f, now, si_s, out):
# immutable # mutable share
m = MutableShareFile(abs_sharefile)
WE, nodeid = m._read_write_enabler_and_nodeid(f)
data_length = m._read_data_length(f)
expiration_time = min( [lease.get_expiration_time()
for (i,lease) in m._enumerate_leases(f)] )
expiration = max(0, expiration_time - now)
class ImmediateReadBucketProxy(ReadBucketProxy): share_type = "unknown"
def __init__(self, sf): f.seek(m.DATA_OFFSET)
self.sf = sf version = f.read(1)
ReadBucketProxy.__init__(self, None, None, "") if version == b"\x00":
def __repr__(self): # this slot contains an SMDF share
return "<ImmediateReadBucketProxy>" share_type = "SDMF"
def _read(self, offset, size): elif version == b"\x01":
return defer.succeed(sf.read_share_data(offset, size)) share_type = "MDMF"
# use a ReadBucketProxy to parse the bucket and find the uri extension if share_type == "SDMF":
sf = ShareFile(abs_sharefile) f.seek(m.DATA_OFFSET)
bp = ImmediateReadBucketProxy(sf)
expiration_time = min( [lease.get_expiration_time() # Read at least the mutable header length, if possible. If there's
for lease in sf.get_leases()] ) # less data than that in the share, don't try to read more (we won't
expiration = max(0, expiration_time - now) # be able to unpack the header in this case but we surely don't want
# to try to unpack bytes *following* the data section as if they were
# header data). Rather than 2000 we could use HEADER_LENGTH from
# allmydata/mutable/layout.py, probably.
data = f.read(min(data_length, 2000))
UEB_data = call(bp.get_uri_extension) try:
unpacked = uri.unpack_extension_readable(UEB_data) pieces = unpack_share(data)
except NeedMoreDataError as e:
# retry once with the larger size
size = e.needed_bytes
f.seek(m.DATA_OFFSET)
data = f.read(min(data_length, size))
pieces = unpack_share(data)
(seqnum, root_hash, IV, k, N, segsize, datalen,
pubkey, signature, share_hash_chain, block_hash_tree,
share_data, enc_privkey) = pieces
k = unpacked["needed_shares"] print("SDMF %s %d/%d %d #%d:%s %d %s" % \
N = unpacked["total_shares"] (si_s, k, N, datalen,
filesize = unpacked["size"] seqnum, str(base32.b2a(root_hash), "utf-8"),
ueb_hash = unpacked["UEB_hash"] expiration, quote_output(abs_sharefile)), file=out)
elif share_type == "MDMF":
fake_shnum = 0
# TODO: factor this out with dump_MDMF_share()
class ShareDumper(MDMFSlotReadProxy):
def _read(self, readvs, force_remote=False, queue=False):
data = []
for (where,length) in readvs:
f.seek(m.DATA_OFFSET+where)
data.append(f.read(length))
return defer.succeed({fake_shnum: data})
print("CHK %s %d/%d %d %s %d %s" % (si_s, k, N, filesize, p = ShareDumper(None, "fake-si", fake_shnum)
str(ueb_hash, "utf-8"), expiration, def extract(func):
quote_output(abs_sharefile)), file=out) stash = []
# these methods return Deferreds, but we happen to know that
# they run synchronously when not actually talking to a
# remote server
d = func()
d.addCallback(stash.append)
return stash[0]
verinfo = extract(p.get_verinfo)
(seqnum, root_hash, salt_to_use, segsize, datalen, k, N, prefix,
offsets) = verinfo
print("MDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
else: else:
print("UNKNOWN really-unknown %s" % quote_output(abs_sharefile), file=out) print("UNKNOWN mutable %s" % quote_output(abs_sharefile), file=out)
def _describe_immutable_share(abs_sharefile, now, si_s, out):
class ImmediateReadBucketProxy(ReadBucketProxy):
def __init__(self, sf):
self.sf = sf
ReadBucketProxy.__init__(self, None, None, "")
def __repr__(self):
return "<ImmediateReadBucketProxy>"
def _read(self, offset, size):
return defer.succeed(sf.read_share_data(offset, size))
# use a ReadBucketProxy to parse the bucket and find the uri extension
sf = ShareFile(abs_sharefile)
bp = ImmediateReadBucketProxy(sf)
expiration_time = min(lease.get_expiration_time()
for lease in sf.get_leases())
expiration = max(0, expiration_time - now)
UEB_data = call(bp.get_uri_extension)
unpacked = uri.unpack_extension_readable(UEB_data)
k = unpacked["needed_shares"]
N = unpacked["total_shares"]
filesize = unpacked["size"]
ueb_hash = unpacked["UEB_hash"]
print("CHK %s %d/%d %d %s %d %s" % (si_s, k, N, filesize,
str(ueb_hash, "utf-8"), expiration,
quote_output(abs_sharefile)), file=out)
f.close()
def catalog_shares(options): def catalog_shares(options):
from allmydata.util.encodingutil import listdir_unicode, quote_output from allmydata.util.encodingutil import listdir_unicode, quote_output
@ -933,34 +935,35 @@ def corrupt_share(options):
f.write(d) f.write(d)
f.close() f.close()
f = open(fn, "rb") with open(fn, "rb") as f:
prefix = f.read(32) prefix = f.read(32)
f.close()
if prefix == MutableShareFile.MAGIC:
# mutable
m = MutableShareFile(fn)
f = open(fn, "rb")
f.seek(m.DATA_OFFSET)
data = f.read(2000)
# make sure this slot contains an SMDF share
assert data[0:1] == b"\x00", "non-SDMF mutable shares not supported"
f.close()
(version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize, if MutableShareFile.is_valid_header(prefix):
ig_datalen, offsets) = unpack_header(data) # mutable
m = MutableShareFile(fn)
with open(fn, "rb") as f:
f.seek(m.DATA_OFFSET)
# Read enough data to get a mutable header to unpack.
data = f.read(2000)
# make sure this slot contains an SMDF share
assert data[0:1] == b"\x00", "non-SDMF mutable shares not supported"
f.close()
assert version == 0, "we only handle v0 SDMF files" (version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize,
start = m.DATA_OFFSET + offsets["share_data"] ig_datalen, offsets) = unpack_header(data)
end = m.DATA_OFFSET + offsets["enc_privkey"]
flip_bit(start, end) assert version == 0, "we only handle v0 SDMF files"
else: start = m.DATA_OFFSET + offsets["share_data"]
# otherwise assume it's immutable end = m.DATA_OFFSET + offsets["enc_privkey"]
f = ShareFile(fn) flip_bit(start, end)
bp = ReadBucketProxy(None, None, '') else:
offsets = bp._parse_offsets(f.read_share_data(0, 0x24)) # otherwise assume it's immutable
start = f._data_offset + offsets["data"] f = ShareFile(fn)
end = f._data_offset + offsets["plaintext_hash_tree"] bp = ReadBucketProxy(None, None, '')
flip_bit(start, end) offsets = bp._parse_offsets(f.read_share_data(0, 0x24))
start = f._data_offset + offsets["data"]
end = f._data_offset + offsets["plaintext_hash_tree"]
flip_bit(start, end)

View File

@ -57,6 +57,21 @@ class ShareFile(object):
LEASE_SIZE = struct.calcsize(">L32s32sL") LEASE_SIZE = struct.calcsize(">L32s32sL")
sharetype = "immutable" sharetype = "immutable"
@classmethod
def is_valid_header(cls, header):
# type: (bytes) -> bool
"""
Determine if the given bytes constitute a valid header for this type of
container.
:param header: Some bytes from the beginning of a container.
:return: ``True`` if the bytes could belong to this container,
``False`` otherwise.
"""
(version,) = struct.unpack(">L", header[:4])
return version == 1
def __init__(self, filename, max_size=None, create=False): def __init__(self, filename, max_size=None, create=False):
""" If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """ """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
precondition((max_size is not None) or (not create), max_size, create) precondition((max_size is not None) or (not create), max_size, create)

View File

@ -67,6 +67,20 @@ class MutableShareFile(object):
MAX_SIZE = MAX_MUTABLE_SHARE_SIZE MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
# TODO: decide upon a policy for max share size # TODO: decide upon a policy for max share size
@classmethod
def is_valid_header(cls, header):
# type: (bytes) -> bool
"""
Determine if the given bytes constitute a valid header for this type of
container.
:param header: Some bytes from the beginning of a container.
:return: ``True`` if the bytes could belong to this container,
``False`` otherwise.
"""
return header.startswith(cls.MAGIC)
def __init__(self, filename, parent=None): def __init__(self, filename, parent=None):
self.home = filename self.home = filename
if os.path.exists(self.home): if os.path.exists(self.home):
@ -77,7 +91,7 @@ class MutableShareFile(object):
write_enabler_nodeid, write_enabler, write_enabler_nodeid, write_enabler,
data_length, extra_least_offset) = \ data_length, extra_least_offset) = \
struct.unpack(">32s20s32sQQ", data) struct.unpack(">32s20s32sQQ", data)
if magic != self.MAGIC: if not self.is_valid_header(data):
msg = "sharefile %s had magic '%r' but we wanted '%r'" % \ msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
(filename, magic, self.MAGIC) (filename, magic, self.MAGIC)
raise UnknownMutableContainerVersionError(msg) raise UnknownMutableContainerVersionError(msg)
@ -388,7 +402,7 @@ class MutableShareFile(object):
write_enabler_nodeid, write_enabler, write_enabler_nodeid, write_enabler,
data_length, extra_least_offset) = \ data_length, extra_least_offset) = \
struct.unpack(">32s20s32sQQ", data) struct.unpack(">32s20s32sQQ", data)
assert magic == self.MAGIC assert self.is_valid_header(data)
return (write_enabler, write_enabler_nodeid) return (write_enabler, write_enabler_nodeid)
def readv(self, readv): def readv(self, readv):

View File

@ -14,7 +14,7 @@ if PY2:
else: else:
from typing import Dict from typing import Dict
import os, re, struct, time import os, re, time
import six import six
from foolscap.api import Referenceable from foolscap.api import Referenceable
@ -373,12 +373,12 @@ class StorageServer(service.MultiService, Referenceable):
for shnum, filename in self._get_bucket_shares(storage_index): for shnum, filename in self._get_bucket_shares(storage_index):
with open(filename, 'rb') as f: with open(filename, 'rb') as f:
header = f.read(32) header = f.read(32)
if header[:32] == MutableShareFile.MAGIC: if MutableShareFile.is_valid_header(header):
sf = MutableShareFile(filename, self) sf = MutableShareFile(filename, self)
# note: if the share has been migrated, the renew_lease() # note: if the share has been migrated, the renew_lease()
# call will throw an exception, with information to help the # call will throw an exception, with information to help the
# client update the lease. # client update the lease.
elif header[:4] == struct.pack(">L", 1): elif ShareFile.is_valid_header(header):
sf = ShareFile(filename) sf = ShareFile(filename)
else: else:
continue # non-sharefile continue # non-sharefile

View File

@ -17,8 +17,7 @@ from allmydata.storage.immutable import ShareFile
def get_share_file(filename): def get_share_file(filename):
with open(filename, "rb") as f: with open(filename, "rb") as f:
prefix = f.read(32) prefix = f.read(32)
if prefix == MutableShareFile.MAGIC: if MutableShareFile.is_valid_header(prefix):
return MutableShareFile(filename) return MutableShareFile(filename)
# otherwise assume it's immutable # otherwise assume it's immutable
return ShareFile(filename) return ShareFile(filename)

View File

@ -1068,7 +1068,7 @@ def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False):
def _corrupt_mutable_share_data(data, debug=False): def _corrupt_mutable_share_data(data, debug=False):
prefix = data[:32] prefix = data[:32]
assert prefix == MutableShareFile.MAGIC, "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableShareFile.MAGIC) assert MutableShareFile.is_valid_header(prefix), "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableShareFile.MAGIC)
data_offset = MutableShareFile.DATA_OFFSET data_offset = MutableShareFile.DATA_OFFSET
sharetype = data[data_offset:data_offset+1] sharetype = data[data_offset:data_offset+1]
assert sharetype == b"\x00", "non-SDMF mutable shares not supported" assert sharetype == b"\x00", "non-SDMF mutable shares not supported"

View File

@ -23,6 +23,7 @@ from twisted.internet import defer
from allmydata import uri from allmydata import uri
from allmydata.storage.mutable import MutableShareFile from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import ShareFile
from allmydata.storage.server import si_a2b from allmydata.storage.server import si_a2b
from allmydata.immutable import offloaded, upload from allmydata.immutable import offloaded, upload
from allmydata.immutable.literal import LiteralFileNode from allmydata.immutable.literal import LiteralFileNode
@ -1290,9 +1291,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
# are sharefiles here # are sharefiles here
filename = os.path.join(dirpath, filenames[0]) filename = os.path.join(dirpath, filenames[0])
# peek at the magic to see if it is a chk share # peek at the magic to see if it is a chk share
magic = open(filename, "rb").read(4) with open(filename, "rb") as f:
if magic == b'\x00\x00\x00\x01': if ShareFile.is_valid_header(f.read(32)):
break break
else: else:
self.fail("unable to find any uri_extension files in %r" self.fail("unable to find any uri_extension files in %r"
% self.basedir) % self.basedir)