#!/usr/bin/env python # Rhizome mirror daemon # Copyright (C) 2013 Serval Project Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """This daemon regularly extracts file-sharing (service=file) bundles from Rhizome into a mirror directory, and unpacks all bundles which are archive format (zip, tar, tgz, etc.), into their constituent files. If a newer version of an already-unpacked bundle arrives, the prior unpack is deleted and the new version unpacked in its place. This effectively maintains the directory as an up-to-date mirror of the local Rhizome store content. On-disk file/directory names are formed by conactenating the bundle name field and the bundle ID separated by ':', in order to avoid collisions. @author Andrew Bettison """ import sys import errno import time import os import os.path import re import argparse import subprocess import datetime import fnmatch def main(): parser = argparse.ArgumentParser(description='Unpack Rhizome bundles into mirror directory.') parser.add_argument('--servald', dest='servald', default='servald', help='Path of servald executable') parser.add_argument('--servald-instance', dest='instancepath', default=os.environ.get('SERVALINSTANCE_PATH'), help='Path of servald instance directory') parser.add_argument('--interval', dest='interval', type=float, default=0, help='Seconds to sleep between polling Rhizome') parser.add_argument('--mirror-dir', dest='mirror_dir', required=True, help='Path of directory to store extracted payloads') parser.add_argument('--filter-name', dest='name_filter', help='Only mirror bundles whose names match given Glob pattern') parser.add_argument('--expire-delay', dest='expire_delay', default=0, help='Keep bundles in mirror for this many seconds after no longer listed by Rhizome') parser.add_argument('--paranoid', dest='paranoid', action='store_true', help='Continually check for and correct corrupted mirror contents') opts = parser.parse_args() if opts.instancepath is None: fatal('missing --servald-instance option') if invoke_servald(opts, ['help']) is None: fatal('no executable servald') mirror = RhizomeMirror(opts) mirror.seed() while True: mirror.list() mirror.update() mirror.expire() if not opts.interval: break time.sleep(opts.interval) sys.exit(0) class RhizomeMirror(object): def __init__(self, opts): self.opts = opts self.extracted = {} self.available = None self.payloads_path = opts.mirror_dir self.manifests_path = os.path.join(self.payloads_path, '.manifests') def manifest_path(self, manifest): return os.path.join(self.manifests_path, manifest.stem()) + '.manifest' def payload_path(self, manifest): return os.path.join(self.payloads_path, manifest.stem()) + manifest.suffix() def seed(self): self.extracted = {} try: os.makedirs(self.manifests_path) except OSError, e: if e.errno != errno.EEXIST: raise for manifest_name in os.listdir(self.manifests_path): manifest_path = os.path.join(self.manifests_path, manifest_name) if manifest_name.endswith('.manifest'): stem = os.path.splitext(manifest_name)[0] manifest = RhizomeManifest.from_file(file(manifest_path)) if manifest is not None and stem == manifest.stem(): if self.sync(manifest): log('seeded %r' % (stem,)) self.extracted[stem] = manifest continue # Remove invalid manifests. self.unlink(manifest_path) def sync(self, manifest): payload_path = self.payload_path(manifest) if manifest.filesize == 0: self.unlink(payload_path) return True elif os.path.exists(payload_path): payload_hash = servald_rhizome_hash(self.opts, payload_path) if payload_hash == manifest.filehash: return True else: # Remove payload that does not match its manifest. self.unlink(payload_path) # Not synced -- have to extract the payload. return False def filter(self, manifest): if self.opts.name_filter: if not fnmatch.fnmatch(manifest.name or '', self.opts.name_filter): return False return True def list(self): self.available = None entries = servald_rhizome_list(self.opts) if entries is not None: self.available = {} for ent in entries: manifest = RhizomeManifest.from_list_entry(ent) if manifest is not None and manifest.service == 'file': stem = manifest.stem() self.available[stem] = manifest def update(self): if self.available is not None: for stem, manifest in self.available.iteritems(): manifest_path = self.manifest_path(manifest) payload_path = self.payload_path(manifest) if self.filter(manifest): extracted_manifest = self.extracted.get(stem) kwargs = {} if extracted_manifest is None or manifest.succeeds(extracted_manifest): kwargs['manifest_path'] = manifest_path if manifest.filesize == 0: self.unlink(payload_path) else: kwargs['payload_path'] = payload_path elif manifest.id == extracted_manifest.id: # Assume manifest and payload files are correct if present, unless # in 'paranoid' mode. if manifest.filesize == 0: self.unlink(payload_path) elif os.path.exists(payload_path): if self.opts.paranoid: payload_hash = servald_rhizome_hash(self.opts, payload_path) if payload_hash != manifest.filehash: kwargs['payload_path'] = payload_path else: kwargs['payload_path'] = payload_path if os.path.exists(manifest_path): self.touch(manifest_path) # Remember when this was last available if self.opts.paranoid: check_manifest = RhizomeManifest.from_file(file(manifest_path)) if check_manifest is None or check_manifest != extracted_manifest: kwargs['manifest_path'] = manifest_path else: kwargs['manifest_path'] = manifest_path else: # Ignore listed manifests with the same stem but different bundle ID; keep # the already-extracted bundle (until expired). pass if kwargs: res = servald_rhizome_extract(self.opts, manifest.id, **kwargs) if res: extracted_manifest = RhizomeManifest.from_file(file(manifest_path)) if extracted_manifest is None or extracted_manifest.id != manifest.id or extracted_manifest.version != manifest.version: error('invalid manifest extracted for bid=%s' % (manifest.id,)) self.unlink(payload_path) self.unlink(manifest_path) self.extracted[stem] = None else: self.extracted[stem] = extracted_manifest def expire(self): now = time.time() if self.available is not None: for stem, extracted_manifest in self.extracted.iteritems(): if stem not in self.available: manifest_path = self.manifest_path(manifest) payload_path = self.payload_path(manifest) if os.path.exists(manifest_path): if self.mtime(manifest_path) + self.opts.expire_delay < now: self.unlink(payload_path) self.unlink(manifest_path) self.extracted[stem] = None else: self.unlink(payload_path) def touch(self, path): try: open(path, "r+") except OSError, e: error('cannot touch %r - %s' % (path, e)) def unlink(self, path): if os.path.exists(path): log('unlink %r' % (path,)) try: os.unlink(path) except OSError, e: error('cannot unlink %r - %s' % (path, e)) def mtime(self, path): try: return os.stat(path).st_mtime except OSError, e: if e.errno != errno.ENOENT: error('cannot stat %r - %s' % (path, e)) return None class RhizomeManifest(object): def __init__(self, **fields): self.service = str_nonempty(fields['service']) self.id = bundle_id(fields['id']) self.version = ulong(fields['version']) self.filesize = ulong(fields['filesize']) if 'filesize' in fields else None self.date = time_ms(fields['date']) if 'date' in fields else None self.filehash = file_hash(fields['filehash']) if self.filesize or self.filesize is None else None self.sender = subscriber_id(fields['sender']) if 'sender' in fields else None self.recipient = subscriber_id(fields['recipient']) if 'recipient' in fields else None self.name = str(fields['name']) if 'name' in fields else None self._other = fields @staticmethod def fieldname(text): if text.isalnum(): return text.lower() raise ValueError('invalid literal for RhizomeManifest.fieldname(): %s' % (text,)) @classmethod def is_fieldname(cls, text): try: cls.fieldname(text) return True except ValueError: return False def stem(self): return os.path.splitext(self.name)[0] + ':' + self.id[:12] def suffix(self): return os.path.splitext(self.name)[1] @classmethod def from_list_entry(cls, ent): fieldmap = {} fields = dict((fieldmap.get(key, key), value) for key, value in ent.__dict__.iteritems() if cls.is_fieldname(key) and len(value)) return cls(**fields) @classmethod def from_file(cls, f): body, sig = f.read().split('\0', 1) if body.endswith('\n'): body = body[:-1] fields = {} try: for line in body.split('\n'): field, value = line.split('=', 1) fields[cls.fieldname(field)] = value except (KeyError, ValueError): return None return cls(**fields) def succeeds(self, other): return self.id == other.id and self.version > other.version def ulong(text): n = long(text) if n >= 0: return n raise ValueError('invalid literal for ulong(): %s' % (text,)) def str_nonempty(text): s = str(text) if len(s) > 0: return s raise ValueError('invalid literal for str_nonempty(): %s' % (text,)) def time_ms(text): try: return long(text) except ValueError: pass raise ValueError('invalid literal for time_ms(): %r' % (text,)) def datetime_ms(text): try: ms = time_ms(text) return datetime.fromtimestamp(ms / 1000).replace(microsecond= ms % 1000) except ValueError: pass raise ValueError('invalid literal for datetime_ms(): %r' % (text,)) def subscriber_id(text): try: if len(text) == 64: return '%064X' % long(text, 16) except ValueError: pass raise ValueError('invalid literal for subscriber_id(): %r' % (text,)) def bundle_id(text): try: if len(text) == 64: return '%064X' % long(text, 16) except ValueError: pass raise ValueError('invalid literal for bundle_id(): %r' % (text,)) def file_hash(text): try: if len(text) == 128: return '%0128X' % long(text, 16) except ValueError: pass raise ValueError('invalid literal for file_hash(): %r' % (text,)) class ServaldInterfaceException(Exception): def __init__(self, servald, output, msg): Exception.__init__(self, msg) self.output = output self.servald = servald class RhizomeListEntry(object): def __init__(self, **kwargs): self.__dict__.update(kwargs) def __repr__(self): return '%s(%s)' % (self.__class__.__name__, ', '.join('%s=%r' % i for i in self.__dict__.iteritems())) def servald_rhizome_list(opts): args = ['rhizome', 'list', 'file'] words = invoke_servald(opts, args, output_words=True) if words is None: return None try: if len(words) < 1: raise ValueError('missing first word') ncols = int(words[0]) if len(words) < 1 + ncols: raise ValueError('missing column header') if (len(words) - (1 + ncols)) % ncols != 0: raise ValueError('incomplete row') colmap = {} for col, hdr in enumerate(words[1:ncols+1]): colmap[col] = re.sub(r'[^A-Za-z0-9_]', '_', hdr) rows = [] for i in xrange(ncols + 1, len(words), ncols): ent = RhizomeListEntry() for col in xrange(ncols): setattr(ent, colmap[col], words[i + col]) rows.append(ent) return rows except ValueError, e: error('invalid output from %s: %s' % (' '.join([opts.servald,] + args), e)) return None def servald_rhizome_hash(opts, path): args = ['rhizome', 'hash', 'file', path] out = invoke_servald(opts, args) if out is None: return None if out.endswith('\n'): out = out[:-1] try: return file_hash(out) except ValueError: raise ServaldInterfaceException(opts.servald, out, 'invalid output, not a hex file hash') def servald_rhizome_extract(opts, bid, manifest_path=None, payload_path=None): args = None if payload_path and manifest_path: args = ['rhizome', 'extract', 'bundle', bid, manifest_path, payload_path] elif payload_path: args = ['rhizome', 'extract', 'file', bid, payload_path] elif manifest_path: args = ['rhizome', 'extract', 'manifest', bid, manifest_path] if not args: return None return invoke_servald(opts, args, output_keyvalue=True) def invoke_servald(opts, args, output_keyvalue=False, output_words=False): env = dict(os.environ) if output_words or output_keyvalue: delim = '\x01' env['SERVALD_OUTPUT_DELIMITER'] = delim env['SERVALD_INSTANCEPATH'] = opts.instancepath try: allargs = (opts.servald,) + tuple(args) log('execute ' + ' '.join(map(repr, allargs))) proc = subprocess.Popen(allargs, stdout= subprocess.PIPE, stderr= subprocess.PIPE, env= env, ) out, err = proc.communicate() except OSError, e: error('cannot execute %s - %s' % (executable, e)) return None if proc.returncode != 0: error('%s exited with status %d' % (os.path.basename(opts.servald), proc.returncode)) for line in err.split('\n'): if line.startswith('ERROR:') or line.startswith('WARN:'): error(re.sub(r'^(ERROR|WARN):\s*(\[\d+])?\s*\d\d\:\d\d\:\d\d\.\d+\s*', '', line)) return None if out is None: return None if output_words or output_keyvalue: if not out.endswith(delim): raise ServaldInterfaceException(opts.servald, out, 'missing delimiter') out = out[:-1] words = out.split(delim) if output_keyvalue: keyvalue = {} if len(words) % 2 != 0: raise ServaldInterfaceException(opts.servald, out, 'odd number of output fields') while words: key = words.pop(0) value = words.pop(0) keyvalue[key] = value return keyvalue return words return out def log(msg): print '+ %s' % (msg,) def error(msg): print >>sys.stderr, '%s: %s' % (os.path.basename(sys.argv[0]), msg) def fatal(msg): error(msg) sys.exit(1) if __name__ == '__main__': main()