2021-11-30 23:00:59 -07:00

484 lines
18 KiB
Python

from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import json
import time
import os
import struct
from allmydata.storage.crawler import (
ShareCrawler,
_confirm_json_format,
_convert_cycle_data,
_dump_json_to_file,
)
from allmydata.storage.shares import get_share_file
from allmydata.storage.common import UnknownMutableContainerVersionError, \
UnknownImmutableContainerVersionError
from twisted.python import log as twlog
from twisted.python.filepath import FilePath
def _convert_pickle_state_to_json(state):
"""
Convert a pickle-serialized crawler-history state to the new JSON
format.
:param dict state: the pickled state
:return dict: the state in the JSON form
"""
return {
str(k): _convert_cycle_data(v)
for k, v in state.items()
}
class _HistorySerializer(object):
"""
Serialize the 'history' file of the lease-crawler state. This is
"storage/lease_checker.history" for the pickle or
"storage/lease_checker.history.json" for the new JSON format.
"""
def __init__(self, history_path):
self._path = _confirm_json_format(FilePath(history_path))
if not self._path.exists():
_dump_json_to_file({}, self._path)
def load(self):
"""
Deserialize the existing data.
:return dict: the existing history state
"""
with self._path.open("rb") as f:
history = json.load(f)
return history
def save(self, new_history):
"""
Serialize the existing data as JSON.
"""
_dump_json_to_file(new_history, self._path)
return None
class LeaseCheckingCrawler(ShareCrawler):
"""I examine the leases on all shares, determining which are still valid
and which have expired. I can remove the expired leases (if so
configured), and the share will be deleted when the last lease is
removed.
I collect statistics on the leases and make these available to a web
status page, including::
Space recovered during this cycle-so-far:
actual (only if expiration_enabled=True):
num-buckets, num-shares, sum of share sizes, real disk usage
('real disk usage' means we use stat(fn).st_blocks*512 and include any
space used by the directory)
what it would have been with the original lease expiration time
what it would have been with our configured expiration time
Prediction of space that will be recovered during the rest of this cycle
Prediction of space that will be recovered by the entire current cycle.
Space recovered during the last 10 cycles <-- saved in separate pickle
Shares/buckets examined:
this cycle-so-far
prediction of rest of cycle
during last 10 cycles <-- separate pickle
start/finish time of last 10 cycles <-- separate pickle
expiration time used for last 10 cycles <-- separate pickle
Histogram of leases-per-share:
this-cycle-to-date
last 10 cycles <-- separate pickle
Histogram of lease ages, buckets = 1day
cycle-to-date
last 10 cycles <-- separate pickle
All cycle-to-date values remain valid until the start of the next cycle.
"""
slow_start = 360 # wait 6 minutes after startup
minimum_cycle_time = 12*60*60 # not more than twice per day
def __init__(self, server, statefile, historyfile,
expiration_enabled, mode,
override_lease_duration, # used if expiration_mode=="age"
cutoff_date, # used if expiration_mode=="cutoff-date"
sharetypes):
self._history_serializer = _HistorySerializer(historyfile)
self.expiration_enabled = expiration_enabled
self.mode = mode
self.override_lease_duration = None
self.cutoff_date = None
if self.mode == "age":
assert isinstance(override_lease_duration, (int, type(None)))
self.override_lease_duration = override_lease_duration # seconds
elif self.mode == "cutoff-date":
assert isinstance(cutoff_date, int) # seconds-since-epoch
assert cutoff_date is not None
self.cutoff_date = cutoff_date
else:
raise ValueError("GC mode '%s' must be 'age' or 'cutoff-date'" % mode)
self.sharetypes_to_expire = sharetypes
ShareCrawler.__init__(self, server, statefile)
def add_initial_state(self):
# we fill ["cycle-to-date"] here (even though they will be reset in
# self.started_cycle) just in case someone grabs our state before we
# get started: unit tests do this
so_far = self.create_empty_cycle_dict()
self.state.setdefault("cycle-to-date", so_far)
# in case we upgrade the code while a cycle is in progress, update
# the keys individually
for k in so_far:
self.state["cycle-to-date"].setdefault(k, so_far[k])
def create_empty_cycle_dict(self):
recovered = self.create_empty_recovered_dict()
so_far = {"corrupt-shares": [],
"space-recovered": recovered,
"lease-age-histogram": {}, # (minage,maxage)->count
"leases-per-share-histogram": {}, # leasecount->numshares
}
return so_far
def create_empty_recovered_dict(self):
recovered = {}
for a in ("actual", "original", "configured", "examined"):
for b in ("buckets", "shares", "sharebytes", "diskbytes"):
recovered[a+"-"+b] = 0
recovered[a+"-"+b+"-mutable"] = 0
recovered[a+"-"+b+"-immutable"] = 0
return recovered
def started_cycle(self, cycle):
self.state["cycle-to-date"] = self.create_empty_cycle_dict()
def stat(self, fn):
return os.stat(fn)
def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
bucketdir = os.path.join(prefixdir, storage_index_b32)
s = self.stat(bucketdir)
would_keep_shares = []
wks = None
for fn in os.listdir(bucketdir):
try:
shnum = int(fn)
except ValueError:
continue # non-numeric means not a sharefile
sharefile = os.path.join(bucketdir, fn)
try:
wks = self.process_share(sharefile)
except (UnknownMutableContainerVersionError,
UnknownImmutableContainerVersionError,
struct.error):
twlog.msg("lease-checker error processing %s" % sharefile)
twlog.err()
which = [storage_index_b32, shnum]
self.state["cycle-to-date"]["corrupt-shares"].append(which)
wks = (1, 1, 1, "unknown")
would_keep_shares.append(wks)
sharetype = None
if wks:
# use the last share's sharetype as the buckettype
sharetype = wks[3]
rec = self.state["cycle-to-date"]["space-recovered"]
self.increment(rec, "examined-buckets", 1)
if sharetype:
self.increment(rec, "examined-buckets-"+sharetype, 1)
del wks
try:
bucket_diskbytes = s.st_blocks * 512
except AttributeError:
bucket_diskbytes = 0 # no stat().st_blocks on windows
if sum([wks[0] for wks in would_keep_shares]) == 0:
self.increment_bucketspace("original", bucket_diskbytes, sharetype)
if sum([wks[1] for wks in would_keep_shares]) == 0:
self.increment_bucketspace("configured", bucket_diskbytes, sharetype)
if sum([wks[2] for wks in would_keep_shares]) == 0:
self.increment_bucketspace("actual", bucket_diskbytes, sharetype)
def process_share(self, sharefilename):
# first, find out what kind of a share it is
sf = get_share_file(sharefilename)
sharetype = sf.sharetype
now = time.time()
s = self.stat(sharefilename)
num_leases = 0
num_valid_leases_original = 0
num_valid_leases_configured = 0
expired_leases_configured = []
for li in sf.get_leases():
num_leases += 1
original_expiration_time = li.get_expiration_time()
grant_renew_time = li.get_grant_renew_time_time()
age = li.get_age()
self.add_lease_age_to_histogram(age)
# expired-or-not according to original expiration time
if original_expiration_time > now:
num_valid_leases_original += 1
# expired-or-not according to our configured age limit
expired = False
if self.mode == "age":
age_limit = original_expiration_time
if self.override_lease_duration is not None:
age_limit = self.override_lease_duration
if age > age_limit:
expired = True
else:
assert self.mode == "cutoff-date"
if grant_renew_time < self.cutoff_date:
expired = True
if sharetype not in self.sharetypes_to_expire:
expired = False
if expired:
expired_leases_configured.append(li)
else:
num_valid_leases_configured += 1
so_far = self.state["cycle-to-date"]
self.increment(so_far["leases-per-share-histogram"], str(num_leases), 1)
self.increment_space("examined", s, sharetype)
would_keep_share = [1, 1, 1, sharetype]
if self.expiration_enabled:
for li in expired_leases_configured:
sf.cancel_lease(li.cancel_secret)
if num_valid_leases_original == 0:
would_keep_share[0] = 0
self.increment_space("original", s, sharetype)
if num_valid_leases_configured == 0:
would_keep_share[1] = 0
self.increment_space("configured", s, sharetype)
if self.expiration_enabled:
would_keep_share[2] = 0
self.increment_space("actual", s, sharetype)
return would_keep_share
def increment_space(self, a, s, sharetype):
sharebytes = s.st_size
try:
# note that stat(2) says that st_blocks is 512 bytes, and that
# st_blksize is "optimal file sys I/O ops blocksize", which is
# independent of the block-size that st_blocks uses.
diskbytes = s.st_blocks * 512
except AttributeError:
# the docs say that st_blocks is only on linux. I also see it on
# MacOS. But it isn't available on windows.
diskbytes = sharebytes
so_far_sr = self.state["cycle-to-date"]["space-recovered"]
self.increment(so_far_sr, a+"-shares", 1)
self.increment(so_far_sr, a+"-sharebytes", sharebytes)
self.increment(so_far_sr, a+"-diskbytes", diskbytes)
if sharetype:
self.increment(so_far_sr, a+"-shares-"+sharetype, 1)
self.increment(so_far_sr, a+"-sharebytes-"+sharetype, sharebytes)
self.increment(so_far_sr, a+"-diskbytes-"+sharetype, diskbytes)
def increment_bucketspace(self, a, bucket_diskbytes, sharetype):
rec = self.state["cycle-to-date"]["space-recovered"]
self.increment(rec, a+"-diskbytes", bucket_diskbytes)
self.increment(rec, a+"-buckets", 1)
if sharetype:
self.increment(rec, a+"-diskbytes-"+sharetype, bucket_diskbytes)
self.increment(rec, a+"-buckets-"+sharetype, 1)
def increment(self, d, k, delta=1):
if k not in d:
d[k] = 0
d[k] += delta
def add_lease_age_to_histogram(self, age):
bucket_interval = 24*60*60
bucket_number = int(age/bucket_interval)
bucket_start = bucket_number * bucket_interval
bucket_end = bucket_start + bucket_interval
k = (bucket_start, bucket_end)
self.increment(self.state["cycle-to-date"]["lease-age-histogram"], k, 1)
def convert_lease_age_histogram(self, lah):
# convert { (minage,maxage) : count } into [ (minage,maxage,count) ]
# since the former is not JSON-safe (JSON dictionaries must have
# string keys).
json_safe_lah = []
for k in sorted(lah):
(minage,maxage) = k
json_safe_lah.append( (minage, maxage, lah[k]) )
return json_safe_lah
def finished_cycle(self, cycle):
# add to our history state, prune old history
h = {}
start = self.state["current-cycle-start-time"]
now = time.time()
h["cycle-start-finish-times"] = [start, now]
h["expiration-enabled"] = self.expiration_enabled
h["configured-expiration-mode"] = [
self.mode,
self.override_lease_duration,
self.cutoff_date,
self.sharetypes_to_expire,
]
s = self.state["cycle-to-date"]
# state["lease-age-histogram"] is a dictionary (mapping
# (minage,maxage) tuple to a sharecount), but we report
# self.get_state()["lease-age-histogram"] as a list of
# (min,max,sharecount) tuples, because JSON can handle that better.
# We record the list-of-tuples form into the history for the same
# reason.
lah = self.convert_lease_age_histogram(s["lease-age-histogram"])
h["lease-age-histogram"] = lah
h["leases-per-share-histogram"] = s["leases-per-share-histogram"].copy()
h["corrupt-shares"] = s["corrupt-shares"][:]
# note: if ["shares-recovered"] ever acquires an internal dict, this
# copy() needs to become a deepcopy
h["space-recovered"] = s["space-recovered"].copy()
history = self._history_serializer.load()
history[str(cycle)] = h
while len(history) > 10:
oldcycles = sorted(int(k) for k in history.keys())
del history[str(oldcycles[0])]
self._history_serializer.save(history)
def get_state(self):
"""In addition to the crawler state described in
ShareCrawler.get_state(), I return the following keys which are
specific to the lease-checker/expirer. Note that the non-history keys
(with 'cycle' in their names) are only present if a cycle is
currently running. If the crawler is between cycles, it appropriate
to show the latest item in the 'history' key instead. Also note that
each history item has all the data in the 'cycle-to-date' value, plus
cycle-start-finish-times.
cycle-to-date:
expiration-enabled
configured-expiration-mode
lease-age-histogram (list of (minage,maxage,sharecount) tuples)
leases-per-share-histogram
corrupt-shares (list of (si_b32,shnum) tuples, minimal verification)
space-recovered
estimated-remaining-cycle:
# Values may be None if not enough data has been gathered to
# produce an estimate.
space-recovered
estimated-current-cycle:
# cycle-to-date plus estimated-remaining. Values may be None if
# not enough data has been gathered to produce an estimate.
space-recovered
history: maps cyclenum to a dict with the following keys:
cycle-start-finish-times
expiration-enabled
configured-expiration-mode
lease-age-histogram
leases-per-share-histogram
corrupt-shares
space-recovered
The 'space-recovered' structure is a dictionary with the following
keys:
# 'examined' is what was looked at
examined-buckets, examined-buckets-mutable, examined-buckets-immutable
examined-shares, -mutable, -immutable
examined-sharebytes, -mutable, -immutable
examined-diskbytes, -mutable, -immutable
# 'actual' is what was actually deleted
actual-buckets, -mutable, -immutable
actual-shares, -mutable, -immutable
actual-sharebytes, -mutable, -immutable
actual-diskbytes, -mutable, -immutable
# would have been deleted, if the original lease timer was used
original-buckets, -mutable, -immutable
original-shares, -mutable, -immutable
original-sharebytes, -mutable, -immutable
original-diskbytes, -mutable, -immutable
# would have been deleted, if our configured max_age was used
configured-buckets, -mutable, -immutable
configured-shares, -mutable, -immutable
configured-sharebytes, -mutable, -immutable
configured-diskbytes, -mutable, -immutable
"""
progress = self.get_progress()
state = ShareCrawler.get_state(self) # does a shallow copy
state["history"] = self._history_serializer.load()
if not progress["cycle-in-progress"]:
del state["cycle-to-date"]
return state
so_far = state["cycle-to-date"].copy()
state["cycle-to-date"] = so_far
lah = so_far["lease-age-histogram"]
so_far["lease-age-histogram"] = self.convert_lease_age_histogram(lah)
so_far["expiration-enabled"] = self.expiration_enabled
so_far["configured-expiration-mode"] = [
self.mode,
self.override_lease_duration,
self.cutoff_date,
self.sharetypes_to_expire,
]
so_far_sr = so_far["space-recovered"]
remaining_sr = {}
remaining = {"space-recovered": remaining_sr}
cycle_sr = {}
cycle = {"space-recovered": cycle_sr}
if progress["cycle-complete-percentage"] > 0.0:
pc = progress["cycle-complete-percentage"] / 100.0
m = (1-pc)/pc
for a in ("actual", "original", "configured", "examined"):
for b in ("buckets", "shares", "sharebytes", "diskbytes"):
for c in ("", "-mutable", "-immutable"):
k = a+"-"+b+c
remaining_sr[k] = m * so_far_sr[k]
cycle_sr[k] = so_far_sr[k] + remaining_sr[k]
else:
for a in ("actual", "original", "configured", "examined"):
for b in ("buckets", "shares", "sharebytes", "diskbytes"):
for c in ("", "-mutable", "-immutable"):
k = a+"-"+b+c
remaining_sr[k] = None
cycle_sr[k] = None
state["estimated-remaining-cycle"] = remaining
state["estimated-current-cycle"] = cycle
return state