Merge branch 'master' into 2916.grid-manager-integration-tests.2

This commit is contained in:
meejah 2022-09-26 18:56:19 -06:00
commit ecfc17cb57
13 changed files with 121 additions and 94 deletions

View File

@ -264,3 +264,18 @@ the "tahoe-conf" file for notes about configuration and installing these
plugins into a Munin environment.
.. _Munin: http://munin-monitoring.org/
Scraping Stats Values in OpenMetrics Format
===========================================
Time Series DataBase (TSDB) software like Prometheus_ and VictoriaMetrics_ can
parse statistics from the e.g. http://localhost:3456/statistics?t=openmetrics
URL in OpenMetrics_ format. Software like Grafana_ can then be used to graph
and alert on these numbers. You can find a pre-configured dashboard for
Grafana at https://grafana.com/grafana/dashboards/16894-tahoe-lafs/.
.. _OpenMetrics: https://openmetrics.io/
.. _Prometheus: https://prometheus.io/
.. _VictoriaMetrics: https://victoriametrics.com/
.. _Grafana: https://grafana.com/

1
newsfragments/3786.minor Normal file
View File

@ -0,0 +1 @@
Added re-structured text documentation for the OpenMetrics format statistics endpoint.

0
newsfragments/3915.minor Normal file
View File

0
newsfragments/3916.minor Normal file
View File

View File

@ -694,3 +694,24 @@ class Encoder(object):
return self.uri_extension_data
def get_uri_extension_hash(self):
return self.uri_extension_hash
def get_uri_extension_size(self):
"""
Calculate the size of the URI extension that gets written at the end of
immutables.
This may be done earlier than actual encoding, so e.g. we might not
know the crypttext hashes, but that's fine for our purposes since we
only care about the length.
"""
params = self.uri_extension_data.copy()
params["crypttext_hash"] = b"\x00" * hashutil.CRYPTO_VAL_SIZE
params["crypttext_root_hash"] = b"\x00" * hashutil.CRYPTO_VAL_SIZE
params["share_root_hash"] = b"\x00" * hashutil.CRYPTO_VAL_SIZE
assert params.keys() == {
"codec_name", "codec_params", "size", "segment_size", "num_segments",
"needed_shares", "total_shares", "tail_codec_params",
"crypttext_hash", "crypttext_root_hash", "share_root_hash"
}, params.keys()
uri_extension = uri.pack_extension(params)
return len(uri_extension)

View File

@ -19,6 +19,7 @@ from allmydata.util import mathutil, observer, pipeline, log
from allmydata.util.assertutil import precondition
from allmydata.storage.server import si_b2a
class LayoutInvalid(Exception):
""" There is something wrong with these bytes so they can't be
interpreted as the kind of immutable file that I know how to download."""
@ -90,7 +91,7 @@ FORCE_V2 = False # set briefly by unit tests to make small-sized V2 shares
def make_write_bucket_proxy(rref, server,
data_size, block_size, num_segments,
num_share_hashes, uri_extension_size_max):
num_share_hashes, uri_extension_size):
# Use layout v1 for small files, so they'll be readable by older versions
# (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
# by tahoe-1.3.0 or later.
@ -99,11 +100,11 @@ def make_write_bucket_proxy(rref, server,
raise FileTooLargeError
wbp = WriteBucketProxy(rref, server,
data_size, block_size, num_segments,
num_share_hashes, uri_extension_size_max)
num_share_hashes, uri_extension_size)
except FileTooLargeError:
wbp = WriteBucketProxy_v2(rref, server,
data_size, block_size, num_segments,
num_share_hashes, uri_extension_size_max)
num_share_hashes, uri_extension_size)
return wbp
@implementer(IStorageBucketWriter)
@ -112,20 +113,20 @@ class WriteBucketProxy(object):
fieldstruct = ">L"
def __init__(self, rref, server, data_size, block_size, num_segments,
num_share_hashes, uri_extension_size_max, pipeline_size=50000):
num_share_hashes, uri_extension_size, pipeline_size=50000):
self._rref = rref
self._server = server
self._data_size = data_size
self._block_size = block_size
self._num_segments = num_segments
self._written_bytes = 0
effective_segments = mathutil.next_power_of_k(num_segments,2)
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
# how many share hashes are included in each share? This will be
# about ln2(num_shares).
self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE)
# we commit to not sending a uri extension larger than this
self._uri_extension_size_max = uri_extension_size_max
self._uri_extension_size = uri_extension_size
self._create_offsets(block_size, data_size)
@ -137,7 +138,7 @@ class WriteBucketProxy(object):
def get_allocated_size(self):
return (self._offsets['uri_extension'] + self.fieldsize +
self._uri_extension_size_max)
self._uri_extension_size)
def _create_offsets(self, block_size, data_size):
if block_size >= 2**32 or data_size >= 2**32:
@ -195,6 +196,14 @@ class WriteBucketProxy(object):
return self._write(offset, data)
def put_crypttext_hashes(self, hashes):
# plaintext_hash_tree precedes crypttext_hash_tree. It is not used, and
# so is not explicitly written, but we need to write everything, so
# fill it in with nulls.
d = self._write(self._offsets['plaintext_hash_tree'], b"\x00" * self._segment_hash_size)
d.addCallback(lambda _: self._really_put_crypttext_hashes(hashes))
return d
def _really_put_crypttext_hashes(self, hashes):
offset = self._offsets['crypttext_hash_tree']
assert isinstance(hashes, list)
data = b"".join(hashes)
@ -233,8 +242,7 @@ class WriteBucketProxy(object):
def put_uri_extension(self, data):
offset = self._offsets['uri_extension']
assert isinstance(data, bytes)
precondition(len(data) <= self._uri_extension_size_max,
len(data), self._uri_extension_size_max)
precondition(len(data) == self._uri_extension_size)
length = struct.pack(self.fieldstruct, len(data))
return self._write(offset, length+data)
@ -244,11 +252,12 @@ class WriteBucketProxy(object):
# would reduce the foolscap CPU overhead per share, but wouldn't
# reduce the number of round trips, so it might not be worth the
# effort.
self._written_bytes += len(data)
return self._pipeline.add(len(data),
self._rref.callRemote, "write", offset, data)
def close(self):
assert self._written_bytes == self.get_allocated_size(), f"{self._written_bytes} != {self.get_allocated_size()}"
d = self._pipeline.add(0, self._rref.callRemote, "close")
d.addCallback(lambda ign: self._pipeline.flush())
return d
@ -303,8 +312,6 @@ class WriteBucketProxy_v2(WriteBucketProxy):
@implementer(IStorageBucketReader)
class ReadBucketProxy(object):
MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes
def __init__(self, rref, server, storage_index):
self._rref = rref
self._server = server
@ -332,11 +339,6 @@ class ReadBucketProxy(object):
# TODO: for small shares, read the whole bucket in _start()
d = self._fetch_header()
d.addCallback(self._parse_offsets)
# XXX The following two callbacks implement a slightly faster/nicer
# way to get the ueb and sharehashtree, but it requires that the
# storage server be >= v1.3.0.
# d.addCallback(self._fetch_sharehashtree_and_ueb)
# d.addCallback(self._parse_sharehashtree_and_ueb)
def _fail_waiters(f):
self._ready.fire(f)
def _notify_waiters(result):
@ -381,29 +383,6 @@ class ReadBucketProxy(object):
self._offsets[field] = offset
return self._offsets
def _fetch_sharehashtree_and_ueb(self, offsets):
sharehashtree_size = offsets['uri_extension'] - offsets['share_hashes']
return self._read(offsets['share_hashes'],
self.MAX_UEB_SIZE+sharehashtree_size)
def _parse_sharehashtree_and_ueb(self, data):
sharehashtree_size = self._offsets['uri_extension'] - self._offsets['share_hashes']
if len(data) < sharehashtree_size:
raise LayoutInvalid("share hash tree truncated -- should have at least %d bytes -- not %d" % (sharehashtree_size, len(data)))
if sharehashtree_size % (2+HASH_SIZE) != 0:
raise LayoutInvalid("share hash tree malformed -- should have an even multiple of %d bytes -- not %d" % (2+HASH_SIZE, sharehashtree_size))
self._share_hashes = []
for i in range(0, sharehashtree_size, 2+HASH_SIZE):
hashnum = struct.unpack(">H", data[i:i+2])[0]
hashvalue = data[i+2:i+2+HASH_SIZE]
self._share_hashes.append( (hashnum, hashvalue) )
i = self._offsets['uri_extension']-self._offsets['share_hashes']
if len(data) < i+self._fieldsize:
raise LayoutInvalid("not enough bytes to encode URI length -- should be at least %d bytes long, not %d " % (i+self._fieldsize, len(data),))
length = struct.unpack(self._fieldstruct, data[i:i+self._fieldsize])[0]
self._ueb_data = data[i+self._fieldsize:i+self._fieldsize+length]
def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):
offset = self._offsets['data'] + blocknum * blocksize
return self._read(offset, thisblocksize)
@ -446,20 +425,18 @@ class ReadBucketProxy(object):
else:
return defer.succeed([])
def _get_share_hashes(self, unused=None):
if hasattr(self, '_share_hashes'):
return self._share_hashes
return self._get_share_hashes_the_old_way()
def get_share_hashes(self):
d = self._start_if_needed()
d.addCallback(self._get_share_hashes)
return d
def _get_share_hashes_the_old_way(self):
def _get_share_hashes(self, _ignore):
""" Tahoe storage servers < v1.3.0 would return an error if you tried
to read past the end of the share, so we need to use the offset and
read just that much."""
read just that much.
HTTP-based storage protocol also doesn't like reading past the end.
"""
offset = self._offsets['share_hashes']
size = self._offsets['uri_extension'] - offset
if size % (2+HASH_SIZE) != 0:
@ -477,32 +454,29 @@ class ReadBucketProxy(object):
d.addCallback(_unpack_share_hashes)
return d
def _get_uri_extension_the_old_way(self, unused=None):
def _get_uri_extension(self, unused=None):
""" Tahoe storage servers < v1.3.0 would return an error if you tried
to read past the end of the share, so we need to fetch the UEB size
and then read just that much."""
and then read just that much.
HTTP-based storage protocol also doesn't like reading past the end.
"""
offset = self._offsets['uri_extension']
d = self._read(offset, self._fieldsize)
def _got_length(data):
if len(data) != self._fieldsize:
raise LayoutInvalid("not enough bytes to encode URI length -- should be %d bytes long, not %d " % (self._fieldsize, len(data),))
length = struct.unpack(self._fieldstruct, data)[0]
if length >= 2**31:
# URI extension blocks are around 419 bytes long, so this
# must be corrupted. Anyway, the foolscap interface schema
# for "read" will not allow >= 2**31 bytes length.
if length >= 2000:
# URI extension blocks are around 419 bytes long; in previous
# versions of the code 1000 was used as a default catchall. So
# 2000 or more must be corrupted.
raise RidiculouslyLargeURIExtensionBlock(length)
return self._read(offset+self._fieldsize, length)
d.addCallback(_got_length)
return d
def _get_uri_extension(self, unused=None):
if hasattr(self, '_ueb_data'):
return self._ueb_data
else:
return self._get_uri_extension_the_old_way()
def get_uri_extension(self):
d = self._start_if_needed()
d.addCallback(self._get_uri_extension)

View File

@ -242,31 +242,26 @@ class UploadResults(object):
def get_verifycapstr(self):
return self._verifycapstr
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
# few places). Ask for a little bit more just in case we need it. If
# the extension changes size, we can change EXTENSION_SIZE to
# allocate a more accurate amount of space.
EXTENSION_SIZE = 1000
# TODO: actual extensions are closer to 419 bytes, so we can probably lower
# this.
def pretty_print_shnum_to_servers(s):
return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.items() ])
class ServerTracker(object):
def __init__(self, server,
sharesize, blocksize, num_segments, num_share_hashes,
storage_index,
bucket_renewal_secret, bucket_cancel_secret):
bucket_renewal_secret, bucket_cancel_secret,
uri_extension_size):
self._server = server
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
self.uri_extension_size = uri_extension_size
wbp = layout.make_write_bucket_proxy(None, None, sharesize,
blocksize, num_segments,
num_share_hashes,
EXTENSION_SIZE)
uri_extension_size)
self.wbp_class = wbp.__class__ # to create more of them
self.allocated_size = wbp.get_allocated_size()
self.blocksize = blocksize
@ -314,7 +309,7 @@ class ServerTracker(object):
self.blocksize,
self.num_segments,
self.num_share_hashes,
EXTENSION_SIZE)
self.uri_extension_size)
b[sharenum] = bp
self.buckets.update(b)
return (alreadygot, set(b.keys()))
@ -487,7 +482,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
def get_shareholders(self, storage_broker, secret_holder,
storage_index, share_size, block_size,
num_segments, total_shares, needed_shares,
min_happiness):
min_happiness, uri_extension_size):
"""
@return: (upload_trackers, already_serverids), where upload_trackers
is a set of ServerTracker instances that have agreed to hold
@ -529,7 +524,8 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
# figure out how much space to ask for
wbp = layout.make_write_bucket_proxy(None, None,
share_size, 0, num_segments,
num_share_hashes, EXTENSION_SIZE)
num_share_hashes,
uri_extension_size)
allocated_size = wbp.get_allocated_size()
# decide upon the renewal/cancel secrets, to include them in the
@ -554,7 +550,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
def _create_server_tracker(server, renew, cancel):
return ServerTracker(
server, share_size, block_size, num_segments, num_share_hashes,
storage_index, renew, cancel,
storage_index, renew, cancel, uri_extension_size
)
readonly_trackers, write_trackers = self._create_trackers(
@ -1326,7 +1322,8 @@ class CHKUploader(object):
d = server_selector.get_shareholders(storage_broker, secret_holder,
storage_index,
share_size, block_size,
num_segments, n, k, desired)
num_segments, n, k, desired,
encoder.get_uri_extension_size())
def _done(res):
self._server_selection_elapsed = time.time() - server_selection_started
return res

View File

@ -397,7 +397,9 @@ class BucketWriter(object):
"""
Write data at given offset, return whether the upload is complete.
"""
# Delay the timeout, since we received data:
# Delay the timeout, since we received data; if we get an
# AlreadyCancelled error, that means there's a bug in the client and
# write() was called after close().
self._timeout.reset(30 * 60)
start = self._clock.seconds()
precondition(not self.closed)
@ -419,14 +421,18 @@ class BucketWriter(object):
self._already_written.set(True, offset, end)
self.ss.add_latency("write", self._clock.seconds() - start)
self.ss.count("write")
return self._is_finished()
# Return whether the whole thing has been written. See
# https://github.com/mlenzen/collections-extended/issues/169 and
# https://github.com/mlenzen/collections-extended/issues/172 for why
# it's done this way.
def _is_finished(self):
"""
Return whether the whole thing has been written.
"""
return sum([mr.stop - mr.start for mr in self._already_written.ranges()]) == self._max_size
def close(self):
# This can't actually be enabled, because it's not backwards compatible
# with old Foolscap clients.
# assert self._is_finished()
precondition(not self.closed)
self._timeout.cancel()
start = self._clock.seconds()

View File

@ -14,11 +14,17 @@ if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from zope.interface import implementer
from twisted.trial.unittest import TestCase
from twisted.internet.interfaces import IPushProducer, IPullProducer
from allmydata.util.consumer import MemoryConsumer
from .common import (
SyncTestCase,
)
from testtools.matchers import (
Equals,
)
@implementer(IPushProducer)
@implementer(IPullProducer)
@ -50,7 +56,7 @@ class Producer(object):
self.consumer.unregisterProducer()
class MemoryConsumerTests(TestCase):
class MemoryConsumerTests(SyncTestCase):
"""Tests for MemoryConsumer."""
def test_push_producer(self):
@ -60,14 +66,14 @@ class MemoryConsumerTests(TestCase):
consumer = MemoryConsumer()
producer = Producer(consumer, [b"abc", b"def", b"ghi"])
consumer.registerProducer(producer, True)
self.assertEqual(consumer.chunks, [b"abc"])
self.assertThat(consumer.chunks, Equals([b"abc"]))
producer.iterate()
producer.iterate()
self.assertEqual(consumer.chunks, [b"abc", b"def", b"ghi"])
self.assertEqual(consumer.done, False)
self.assertThat(consumer.chunks, Equals([b"abc", b"def", b"ghi"]))
self.assertFalse(consumer.done)
producer.iterate()
self.assertEqual(consumer.chunks, [b"abc", b"def", b"ghi"])
self.assertEqual(consumer.done, True)
self.assertThat(consumer.chunks, Equals([b"abc", b"def", b"ghi"]))
self.assertTrue(consumer.done)
def test_pull_producer(self):
"""
@ -76,8 +82,8 @@ class MemoryConsumerTests(TestCase):
consumer = MemoryConsumer()
producer = Producer(consumer, [b"abc", b"def", b"ghi"])
consumer.registerProducer(producer, False)
self.assertEqual(consumer.chunks, [b"abc", b"def", b"ghi"])
self.assertEqual(consumer.done, True)
self.assertThat(consumer.chunks, Equals([b"abc", b"def", b"ghi"]))
self.assertTrue(consumer.done)
# download_to_data() is effectively tested by some of the filenode tests, e.g.

View File

@ -251,6 +251,12 @@ class Verifier(GridTestMixin, unittest.TestCase, RepairTestMixin):
self.judge_invisible_corruption)
def test_corrupt_ueb(self):
# Note that in some rare situations this might fail, specifically if
# the length of the UEB is corrupted to be a value that is bigger than
# the size but less than 2000, it might not get caught... But that's
# mostly because in that case it doesn't meaningfully corrupt it. See
# _get_uri_extension_the_old_way() in layout.py for where the 2000
# number comes from.
self.basedir = "repairer/Verifier/corrupt_ueb"
return self._help_test_verify(common._corrupt_uri_extension,
self.judge_invisible_corruption)

View File

@ -463,7 +463,7 @@ class BucketProxy(unittest.TestCase):
block_size=10,
num_segments=5,
num_share_hashes=3,
uri_extension_size_max=500)
uri_extension_size=500)
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
@ -494,7 +494,7 @@ class BucketProxy(unittest.TestCase):
block_size=25,
num_segments=4,
num_share_hashes=3,
uri_extension_size_max=len(uri_extension))
uri_extension_size=len(uri_extension))
d = bp.put_header()
d.addCallback(lambda res: bp.put_block(0, b"a"*25))

View File

@ -46,9 +46,10 @@ from hypothesis.strategies import (
binary,
)
from testtools import (
TestCase,
from .common import (
SyncTestCase,
)
from testtools.matchers import (
Always,
Equals,
@ -61,7 +62,7 @@ from testtools.twistedsupport import (
)
class FakeWebTest(TestCase):
class FakeWebTest(SyncTestCase):
"""
Test the WebUI verified-fakes infrastucture
"""

View File

@ -983,7 +983,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
num_segments = encoder.get_param("num_segments")
d = selector.get_shareholders(broker, sh, storage_index,
share_size, block_size, num_segments,
10, 3, 4)
10, 3, 4, encoder.get_uri_extension_size())
def _have_shareholders(upload_trackers_and_already_servers):
(upload_trackers, already_servers) = upload_trackers_and_already_servers
assert servers_to_break <= len(upload_trackers)