refactor to use serializers / pickle->json upgraders

This commit is contained in:
meejah 2021-10-25 13:15:38 -06:00
parent fa6950f08d
commit f81e4e2d25
4 changed files with 212 additions and 24 deletions

View File

@ -19,12 +19,145 @@ import json
import struct
from twisted.internet import reactor
from twisted.application import service
from twisted.python.filepath import FilePath
from allmydata.storage.common import si_b2a
from allmydata.util import fileutil
class TimeSliceExceeded(Exception):
pass
def _convert_pickle_state_to_json(state):
"""
:param dict state: the pickled state
:return dict: the state in the JSON form
"""
# ["cycle-to-date"]["corrupt-shares"] from 2-tuple to list
# ["leases-per-share-histogram"] gets str keys instead of int
# ["cycle-start-finish-times"] from 2-tuple to list
# ["configured-expiration-mode"] from 4-tuple to list
# ["history"] keys are strings
if state["version"] != 1:
raise ValueError(
"Unknown version {version} in pickle state".format(**state)
)
def convert_lpsh(value):
return {
str(k): v
for k, v in value.items()
}
def convert_cem(value):
# original is a 4-tuple, with the last element being a 2-tuple
# .. convert both to lists
return [
value[0],
value[1],
value[2],
list(value[3]),
]
def convert_history(value):
print("convert history")
print(value)
return {
str(k): v
for k, v in value
}
converters = {
"cycle-to-date": list,
"leases-per-share-histogram": convert_lpsh,
"cycle-starte-finish-times": list,
"configured-expiration-mode": convert_cem,
"history": convert_history,
}
def convert_value(key, value):
converter = converters.get(key, None)
if converter is None:
return value
return converter(value)
new_state = {
k: convert_value(k, v)
for k, v in state.items()
}
return new_state
def _maybe_upgrade_pickle_to_json(state_path, convert_pickle):
"""
:param FilePath state_path: the filepath to ensure is json
:param Callable[dict] convert_pickle: function to change
pickle-style state into JSON-style state
:returns unicode: the local path where the state is stored
If this state path is JSON, simply return it.
If this state is pickle, convert to the JSON format and return the
JSON path.
"""
if state_path.path.endswith(".json"):
return state_path.path
json_state_path = state_path.siblingExtension(".json")
# if there's no file there at all, we're done because there's
# nothing to upgrade
if not state_path.exists():
return json_state_path.path
# upgrade the pickle data to JSON
import pickle
with state_path.open("r") as f:
state = pickle.load(f)
state = convert_pickle(state)
json_state_path = state_path.siblingExtension(".json")
with json_state_path.open("w") as f:
json.dump(state, f)
# we've written the JSON, delete the pickle
state_path.remove()
return json_state_path.path
class _LeaseStateSerializer(object):
"""
Read and write state for LeaseCheckingCrawler. This understands
how to read the legacy pickle format files and upgrade them to the
new JSON format (which will occur automatically).
"""
def __init__(self, state_path):
self._path = FilePath(
_maybe_upgrade_pickle_to_json(
FilePath(state_path),
_convert_pickle_state_to_json,
)
)
# XXX want this to .. load and save the state
# - if the state is pickle-only:
# - load it and convert to json format
# - save json
# - delete pickle
# - if the state is json, load it
def load(self):
with self._path.open("r") as f:
return json.load(f)
def save(self, data):
tmpfile = self._path.siblingExtension(".tmp")
with tmpfile.open("wb") as f:
json.dump(data, f)
fileutil.move_into_place(tmpfile.path, self._path.path)
return None
class ShareCrawler(service.MultiService):
"""A ShareCrawler subclass is attached to a StorageServer, and
periodically walks all of its shares, processing each one in some
@ -87,7 +220,7 @@ class ShareCrawler(service.MultiService):
self.allowed_cpu_percentage = allowed_cpu_percentage
self.server = server
self.sharedir = server.sharedir
self.statefile = statefile
self._state_serializer = _LeaseStateSerializer(statefile)
self.prefixes = [si_b2a(struct.pack(">H", i << (16-10)))[:2]
for i in range(2**10)]
if PY3:
@ -210,8 +343,7 @@ class ShareCrawler(service.MultiService):
# of the last bucket to be processed, or
# None if we are sleeping between cycles
try:
with open(self.statefile, "rb") as f:
state = json.load(f)
state = self._state_serializer.load()
except Exception:
state = {"version": 1,
"last-cycle-finished": None,
@ -247,14 +379,11 @@ class ShareCrawler(service.MultiService):
else:
last_complete_prefix = self.prefixes[lcpi]
self.state["last-complete-prefix"] = last_complete_prefix
tmpfile = self.statefile + ".tmp"
# Note: we use self.get_state() here because e.g
# LeaseCheckingCrawler stores non-JSON-able state in
# self.state() but converts it in self.get_state()
with open(tmpfile, "wb") as f:
json.dump(self.get_state(), f)
fileutil.move_into_place(tmpfile, self.statefile)
self._state_serializer.save(self.get_state())
def startService(self):
# arrange things to look like we were just sleeping, so

View File

@ -10,11 +10,72 @@ import json
import time
import os
import struct
from allmydata.storage.crawler import ShareCrawler
from allmydata.storage.crawler import (
ShareCrawler,
_maybe_upgrade_pickle_to_json,
)
from allmydata.storage.shares import get_share_file
from allmydata.storage.common import UnknownMutableContainerVersionError, \
UnknownImmutableContainerVersionError
from twisted.python import log as twlog
from twisted.python.filepath import FilePath
def _convert_pickle_state_to_json(state):
"""
:param dict state: the pickled state
:return dict: the state in the JSON form
"""
print("CONVERT", state)
for k, v in state.items():
print(k, v)
if state["version"] != 1:
raise ValueError(
"Unknown version {version} in pickle state".format(**state)
)
return state
class _HistorySerializer(object):
"""
Serialize the 'history' file of the lease-crawler state. This is
"storage/history.state" for the pickle or
"storage/history.state.json" for the new JSON format.
"""
def __init__(self, history_path):
self._path = FilePath(
_maybe_upgrade_pickle_to_json(
FilePath(history_path),
_convert_pickle_state_to_json,
)
)
if not self._path.exists():
with self._path.open("wb") as f:
json.dump({}, f)
def read(self):
"""
Deserialize the existing data.
:return dict: the existing history state
"""
assert self._path is not None, "Not initialized"
with self._path.open("rb") as f:
history = json.load(f)
return history
def write(self, new_history):
"""
Serialize the existing data as JSON.
"""
assert self._path is not None, "Not initialized"
with self._path.open("wb") as f:
json.dump(new_history, f)
return None
class LeaseCheckingCrawler(ShareCrawler):
"""I examine the leases on all shares, determining which are still valid
@ -64,7 +125,8 @@ class LeaseCheckingCrawler(ShareCrawler):
override_lease_duration, # used if expiration_mode=="age"
cutoff_date, # used if expiration_mode=="cutoff-date"
sharetypes):
self.historyfile = historyfile
self._history_serializer = _HistorySerializer(historyfile)
##self.historyfile = historyfile
self.expiration_enabled = expiration_enabled
self.mode = mode
self.override_lease_duration = None
@ -92,12 +154,6 @@ class LeaseCheckingCrawler(ShareCrawler):
for k in so_far:
self.state["cycle-to-date"].setdefault(k, so_far[k])
# initialize history
if not os.path.exists(self.historyfile):
history = {} # cyclenum -> dict
with open(self.historyfile, "wb") as f:
json.dump(history, f)
def create_empty_cycle_dict(self):
recovered = self.create_empty_recovered_dict()
so_far = {"corrupt-shares": [],
@ -315,14 +371,12 @@ class LeaseCheckingCrawler(ShareCrawler):
# copy() needs to become a deepcopy
h["space-recovered"] = s["space-recovered"].copy()
with open(self.historyfile, "rb") as f:
history = json.load(f)
history = self._history_serializer.read()
history[str(cycle)] = h
while len(history) > 10:
oldcycles = sorted(int(k) for k in history.keys())
del history[str(oldcycles[0])]
with open(self.historyfile, "wb") as f:
json.dump(history, f)
self._history_serializer.write(history)
def get_state(self):
"""In addition to the crawler state described in
@ -391,9 +445,7 @@ class LeaseCheckingCrawler(ShareCrawler):
progress = self.get_progress()
state = ShareCrawler.get_state(self) # does a shallow copy
with open(self.historyfile, "rb") as f:
history = json.load(f)
state["history"] = history
state["history"] = self._history_serializer.read()
if not progress["cycle-in-progress"]:
del state["cycle-to-date"]

View File

@ -57,6 +57,7 @@ DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60
@implementer(RIStorageServer, IStatsProducer)
class StorageServer(service.MultiService, Referenceable):
name = 'storage'
# only the tests change this to anything else
LeaseCheckerClass = LeaseCheckingCrawler
def __init__(self, storedir, nodeid, reserved_space=0,

View File

@ -25,14 +25,20 @@ from twisted.trial import unittest
from twisted.internet import defer
from twisted.application import service
from twisted.web.template import flattenString
from twisted.python.filepath import FilePath
from foolscap.api import fireEventually
from allmydata.util import fileutil, hashutil, base32, pollmixin
from allmydata.storage.common import storage_index_to_dir, \
UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
from allmydata.storage.server import StorageServer
from allmydata.storage.crawler import BucketCountingCrawler
from allmydata.storage.expirer import LeaseCheckingCrawler
from allmydata.storage.crawler import (
BucketCountingCrawler,
_LeaseStateSerializer,
)
from allmydata.storage.expirer import (
LeaseCheckingCrawler,
)
from allmydata.web.storage import (
StorageStatus,
StorageStatusElement,