From 81bed5ab3376393524373009da023752bb7d54e4 Mon Sep 17 00:00:00 2001 From: Andrew Bettison Date: Mon, 18 Feb 2013 17:24:01 +1030 Subject: [PATCH] More progress on Rhizome mirror script --- utilities/rhizome_mirrord | 320 ++++++++++++++++++++++++++------------ 1 file changed, 221 insertions(+), 99 deletions(-) diff --git a/utilities/rhizome_mirrord b/utilities/rhizome_mirrord index ac2c5eb9..7346a207 100755 --- a/utilities/rhizome_mirrord +++ b/utilities/rhizome_mirrord @@ -33,18 +33,25 @@ and the bundle ID separated by ':', in order to avoid collisions. """ import sys +import errno +import time +import os import os.path import re import argparse import subprocess import datetime +import fnmatch def main(): parser = argparse.ArgumentParser(description='Unpack Rhizome bundles into mirror directory.') parser.add_argument('--servald', dest='servald', default='servald', help='Path of servald executable') parser.add_argument('--servald-instance', dest='instancepath', default=os.environ.get('SERVALINSTANCE_PATH'), help='Path of servald instance directory') - parser.add_argument('--interval', dest='interval', type=int, default=0, help='Seconds to sleep between polling Rhizome') + parser.add_argument('--interval', dest='interval', type=float, default=0, help='Seconds to sleep between polling Rhizome') parser.add_argument('--mirror-dir', dest='mirror_dir', required=True, help='Path of directory to store extracted payloads') + parser.add_argument('--filter-name', dest='name_filter', help='Only mirror bundles whose names match given Glob pattern') + parser.add_argument('--expire-delay', dest='expire_delay', default=0, help='Keep bundles in mirror for this many seconds after no longer listed by Rhizome') + parser.add_argument('--paranoid', dest='paranoid', action='store_true', help='Continually check for and correct corrupted mirror contents') opts = parser.parse_args() if opts.instancepath is None: fatal('missing --servald-instance option') @@ -52,123 +59,237 @@ def main(): fatal('no executable servald') mirror = RhizomeMirror(opts) mirror.seed() - mirror.update_list() + while True: + mirror.list() + mirror.update() + mirror.expire() + if not opts.interval: + break + time.sleep(opts.interval) + sys.exit(0) class RhizomeMirror(object): def __init__(self, opts): self.opts = opts self.extracted = {} + self.available = None self.payloads_path = opts.mirror_dir self.manifests_path = os.path.join(self.payloads_path, '.manifests') + def manifest_path(self, manifest): + return os.path.join(self.manifests_path, manifest.stem()) + '.manifest' + + def payload_path(self, manifest): + return os.path.join(self.payloads_path, manifest.stem()) + manifest.suffix() + def seed(self): self.extracted = {} - os.makedirs(self.manifests_path) + try: + os.makedirs(self.manifests_path) + except OSError, e: + if e.errno != errno.EEXIST: + raise for manifest_name in os.listdir(self.manifests_path): manifest_path = os.path.join(self.manifests_path, manifest_name) if manifest_name.endswith('.manifest'): stem = os.path.splitext(manifest_name)[0] - manifest = RhizomeManifest().read(file(manifest_path)) - if manifest is not None: - if stem == manifest.stem(): - payload_path = os.path.join(self.payloads_path, stem) - if os.path.exists(payload_path): - payload_hash = servald_rhizome_hash(self.opts, payload_path) - if payload_hash == manifest.filehash: - log('seeded %r' % (stem,)) - self.extracted[stem] = manifest - continue - log('unlink %r' % (manifest_path,)) - try: - os.unlink(manifest_path) - except OSError, e: - error('cannot unlink %r - %s' % (manifest_path, e)) + manifest = RhizomeManifest.from_file(file(manifest_path)) + if manifest is not None and stem == manifest.stem(): + if self.sync(manifest): + log('seeded %r' % (stem,)) + self.extracted[stem] = manifest + continue + # Remove invalid manifests. + self.unlink(manifest_path) - def update_list(self): + def sync(self, manifest): + payload_path = self.payload_path(manifest) + if manifest.filesize == 0: + self.unlink(payload_path) + return True + elif os.path.exists(payload_path): + payload_hash = servald_rhizome_hash(self.opts, payload_path) + if payload_hash == manifest.filehash: + return True + else: + # Remove payload that does not match its manifest. + self.unlink(payload_path) + # Not synced -- have to extract the payload. + return False + + def filter(self, manifest): + if self.opts.name_filter: + if not fnmatch.fnmatch(manifest.name or '', self.opts.name_filter): + return False + return True + + def list(self): + self.available = None entries = servald_rhizome_list(self.opts) - for ent in entries: - manifest = RhizomeManifest().from_list_entry(ent) - if manifest is not None: - stem = manifest.stem() - extracted_manifest = self.extracted.get(stem) - if extracted_manifest is None or manifest.succeeds(extracted_manifest): - servald_rhizome_extract(self.opts, manifest.bid, stem) + if entries is not None: + self.available = {} + for ent in entries: + manifest = RhizomeManifest.from_list_entry(ent) + if manifest is not None and manifest.service == 'file': + stem = manifest.stem() + self.available[stem] = manifest + + def update(self): + if self.available is not None: + for stem, manifest in self.available.iteritems(): + manifest_path = self.manifest_path(manifest) + payload_path = self.payload_path(manifest) + if self.filter(manifest): + extracted_manifest = self.extracted.get(stem) + kwargs = {} + if extracted_manifest is None or manifest.succeeds(extracted_manifest): + kwargs['manifest_path'] = manifest_path + if manifest.filesize == 0: + self.unlink(payload_path) + else: + kwargs['payload_path'] = payload_path + elif manifest.id == extracted_manifest.id: + # Assume manifest and payload files are correct if present, unless + # in 'paranoid' mode. + if manifest.filesize == 0: + self.unlink(payload_path) + elif os.path.exists(payload_path): + if self.opts.paranoid: + payload_hash = servald_rhizome_hash(self.opts, payload_path) + if payload_hash != manifest.filehash: + kwargs['payload_path'] = payload_path + else: + kwargs['payload_path'] = payload_path + if os.path.exists(manifest_path): + self.touch(manifest_path) # Remember when this was last available + if self.opts.paranoid: + check_manifest = RhizomeManifest.from_file(file(manifest_path)) + if check_manifest is None or check_manifest != extracted_manifest: + kwargs['manifest_path'] = manifest_path + else: + kwargs['manifest_path'] = manifest_path + else: + # Ignore listed manifests with the same stem but different bundle ID; keep + # the already-extracted bundle (until expired). + pass + if kwargs: + res = servald_rhizome_extract(self.opts, manifest.id, **kwargs) + if res: + extracted_manifest = RhizomeManifest.from_file(file(manifest_path)) + if extracted_manifest is None or extracted_manifest.id != manifest.id or extracted_manifest.version != manifest.version: + error('invalid manifest extracted for bid=%s' % (manifest.id,)) + self.unlink(payload_path) + self.unlink(manifest_path) + self.extracted[stem] = None + else: + self.extracted[stem] = extracted_manifest + + def expire(self): + now = time.time() + if self.available is not None: + for stem, extracted_manifest in self.extracted.iteritems(): + if stem not in self.available: + manifest_path = self.manifest_path(manifest) + payload_path = self.payload_path(manifest) + if os.path.exists(manifest_path): + if self.mtime(manifest_path) + self.opts.expire_delay < now: + self.unlink(payload_path) + self.unlink(manifest_path) + self.extracted[stem] = None + else: + self.unlink(payload_path) + + def touch(self, path): + try: + open(path, "r+") + except OSError, e: + error('cannot touch %r - %s' % (path, e)) + + def unlink(self, path): + if os.path.exists(path): + log('unlink %r' % (path,)) + try: + os.unlink(path) + except OSError, e: + error('cannot unlink %r - %s' % (path, e)) + + def mtime(self, path): + try: + return os.stat(path).st_mtime + except OSError, e: + if e.errno != errno.ENOENT: + error('cannot stat %r - %s' % (path, e)) + return None class RhizomeManifest(object): - def __init__(self): - self._fields = {} - self.service = None - self.bid = None - self.version = None - self.filesize = None - self.filehash = None - self.name = None + def __init__(self, **fields): + self.service = str_nonempty(fields['service']) + self.id = bundle_id(fields['id']) + self.version = ulong(fields['version']) + self.filesize = ulong(fields['filesize']) if 'filesize' in fields else None + self.date = time_ms(fields['date']) if 'date' in fields else None + self.filehash = file_hash(fields['filehash']) if self.filesize or self.filesize is None else None + self.sender = subscriber_id(fields['sender']) if 'sender' in fields else None + self.recipient = subscriber_id(fields['recipient']) if 'recipient' in fields else None + self.name = str(fields['name']) if 'name' in fields else None + self._other = fields + + @staticmethod + def fieldname(text): + if text.isalnum(): + return text.lower() + raise ValueError('invalid literal for RhizomeManifest.fieldname(): %s' % (text,)) + + @classmethod + def is_fieldname(cls, text): + try: + cls.fieldname(text) + return True + except ValueError: + return False def stem(self): - return os.path.splitext(self.name)[0] + ':' + self.bid[:12] + return os.path.splitext(self.name)[0] + ':' + self.id[:12] - def from_list_entry(self, ent): - try: - service = ent.service - bid = manifest_id(ent.id) - date = time_ms(ent.date) if ent.date else None - version = long(ent.version) - filesize = long(ent.filesize) - fhash = filehash(ent.filehash) if filesize else None - sender = subscriber_id(ent.sender) - recipient = subscriber_id(ent.recipient) - name = ent.name if service == 'file' else None - except (KeyError, ValueError): - return None - self._fields = {} - self._fields['service'] = str(service) - self._fields['manifestid'] = str(bid) - self._fields['version'] = str(version) - self._fields['filesize'] = str(filesize) - if filesize: - self._fields['filehash'] = str(filehash) - if date: - self._fields['date'] = str(date) - if sender: - self._fields['sender'] = str(sender) - if recipient: - self._fields['recipient'] = str(recipient) - if name is not None: - self._fields['name'] = str(name) - self.service = service - self.bid = bid - self.version = version - self.filesize = filesize - self.filehash = fhash - self.name = name - return self + def suffix(self): + return os.path.splitext(self.name)[1] - def read(self, f): + @classmethod + def from_list_entry(cls, ent): + fieldmap = {} + fields = dict((fieldmap.get(key, key), value) for key, value in ent.__dict__.iteritems() if cls.is_fieldname(key) and len(value)) + return cls(**fields) + + @classmethod + def from_file(cls, f): + body, sig = f.read().split('\0', 1) + if body.endswith('\n'): + body = body[:-1] fields = {} try: - for line in f: - if not line.endswith('\n'): - return None + for line in body.split('\n'): field, value = line.split('=', 1) - fields[field] = value - service = manifest_id(fields['service']) - bid = manifest_id(fields['manifestid']) - version = long(fields['version']) - filesize = long(fields['filesize']) - fhash = filehash(fields['filehash']) if filesize else None - name = fields['name'] if service == 'file' else None + fields[cls.fieldname(field)] = value except (KeyError, ValueError): return None - self._fields = fields - self.service = service - self.bid = bid - self.version = version - self.filesize = filesize - self.filehash = fhash - self.name = name - return self + return cls(**fields) + + def succeeds(self, other): + return self.id == other.id and self.version > other.version + +def ulong(text): + n = long(text) + if n >= 0: + return n + raise ValueError('invalid literal for ulong(): %s' % (text,)) + +def str_nonempty(text): + s = str(text) + if len(s) > 0: + return s + raise ValueError('invalid literal for str_nonempty(): %s' % (text,)) def time_ms(text): try: @@ -188,26 +309,26 @@ def datetime_ms(text): def subscriber_id(text): try: if len(text) == 64: - return '%64X' % long(text, 16) + return '%064X' % long(text, 16) except ValueError: pass raise ValueError('invalid literal for subscriber_id(): %r' % (text,)) -def manifest_id(text): +def bundle_id(text): try: if len(text) == 64: - return '%64X' % long(text, 16) + return '%064X' % long(text, 16) except ValueError: pass - raise ValueError('invalid literal for manifest_id(): %r' % (text,)) + raise ValueError('invalid literal for bundle_id(): %r' % (text,)) -def filehash(text): +def file_hash(text): try: if len(text) == 128: - return '%128X' % long(text, 16) + return '%0128X' % long(text, 16) except ValueError: pass - raise ValueError('invalid literal for filehash(): %r' % (text,)) + raise ValueError('invalid literal for file_hash(): %r' % (text,)) class ServaldInterfaceException(Exception): def __init__(self, servald, output, msg): @@ -253,8 +374,10 @@ def servald_rhizome_hash(opts, path): out = invoke_servald(opts, args) if out is None: return None + if out.endswith('\n'): + out = out[:-1] try: - return filehash(out) + return file_hash(out) except ValueError: raise ServaldInterfaceException(opts.servald, out, 'invalid output, not a hex file hash') @@ -268,8 +391,7 @@ def servald_rhizome_extract(opts, bid, manifest_path=None, payload_path=None): args = ['rhizome', 'extract', 'manifest', bid, manifest_path] if not args: return None - keyval = invoke_servald(opts, args, output_keyvalue=True) - print keyval + return invoke_servald(opts, args, output_keyvalue=True) def invoke_servald(opts, args, output_keyvalue=False, output_words=False): env = dict(os.environ)