#!/usr/bin/env python # Rhizome mirror daemon # Copyright (C) 2013 Serval Project Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """This daemon regularly extracts file-sharing (service=file) bundles from Rhizome into a mirror directory, and unpacks all bundles which are archive format (zip, tar, tgz, etc.), into their constituent files. If a newer version of an already-unpacked bundle arrives, the prior unpack is deleted and the new version unpacked in its place. This effectively maintains the directory as an up-to-date mirror of the local Rhizome store content. On-disk file/directory names are formed by conactenating the bundle name field and the bundle ID separated by ':', in order to avoid collisions. @author Andrew Bettison """ import sys import errno import time import os import os.path import re import argparse import subprocess import datetime import shutil import fnmatch import zipfile import tarfile def main(): instancepath = os.environ.get('SERVALINSTANCE_PATH') parser = argparse.ArgumentParser(description='Continuously extract Rhizome store into mirror directory.') parser.add_argument('--mirror-dir', dest='mirror_dir', metavar='PATH', required=True, help='Path of directory to store extracted payloads') parser.add_argument('--servald', dest='servald', metavar='PATH', required=True, help='Path of servald executable') parser.add_argument('--instance', dest='instancepath', metavar='PATH', required=not instancepath, default=instancepath, help='Path of servald instance directory') parser.add_argument('--interval', dest='interval', metavar='N', type=float, default=0, help='Sleep N seconds between polling Rhizome; N=0 means run once and exit') parser.add_argument('--filter-name', dest='name_filter', metavar='GLOB', help='Only mirror bundles whose names match GLOB pattern') parser.add_argument('--unpack-dir', dest='unpack_dir', metavar='PATH', default=None, help='Path of directory in which to unpack bundles (zip, tar, etc.)') parser.add_argument('--exec-on-unpack', dest='exec_on_unpack', metavar='EXECUTABLE', help='Run EXECUTABLE after unpacking one or more bundles') parser.add_argument('--expire-delay', dest='expire_delay', metavar='N', type=int, default=0, help='Keep bundles in mirror for N seconds after no longer listed by Rhizome') parser.add_argument('--error-retry', dest='error_retry', metavar='N', type=int, default=600, help='Wait N seconds before retrying failed operations') parser.add_argument('--paranoid', dest='paranoid', action='store_true', help='Continually check for and correct corrupted mirror contents') parser.add_argument('--log-to-stdout', dest='log_to_stdout', action='store_true', help='Log activity on standard output') opts = parser.parse_args() global log_output log_output = sys.stderr if opts.log_to_stdout else None try: status, output = invoke_servald(opts, ['help']) except ServaldInterfaceException, e: fatal(e) if status is None: fatal('no executable servald') if status != 0 or output is None: fatal('faulty servald') mirror = RhizomeMirror(opts) mirror.clean_unpack_dir() mirror.seed() while True: mirror.list() mirror.update() mirror.expire() if mirror.unpacked: if opts.exec_on_unpack: args = [opts.exec_on_unpack] + sorted(mirror.unpacked) log("execute " + ' '.join(args)) try: subprocess.call(args, stdin=open('/dev/null')) mirror.unpacked.clear() except OSError, e: error('cannot execute %r - %s' % (opts.exec_on_unpack, e)) if not opts.interval: break time.sleep(opts.interval) sys.exit(0) class RhizomeMirror(object): def __init__(self, opts): self.opts = opts self.hash_errors = {} self.extract_errors = {} self.extracted = {} self.available = None self.unpacked = set() self.payloads_path = opts.mirror_dir self.manifests_path = os.path.join(self.payloads_path, '.manifests') self.unpacks_path = opts.unpack_dir def manifest_path(self, manifest): return os.path.join(self.manifests_path, manifest.stem()) + '.manifest' def payload_path(self, manifest): return os.path.join(self.payloads_path, manifest.stem()) + manifest.suffix() def unpack_path(self, manifest): if not self.unpacks_path: return None return os.path.join(self.unpacks_path, manifest.stem()) def unpack_path_tmp(self, manifest, var): unpack_path = self.unpack_path(manifest) if not unpack_path: return None head, tail = os.path.split(unpack_path) return os.path.join(head, '.tmp-%s-%s' % (var, tail)) def clean_unpack_dir(self): if self.unpacks_path and os.path.isdir(self.unpacks_path): for name in os.listdir(self.unpacks_path): if name.startswith('.tmp-'): self.unlink(os.path.join(self.unpacks_path, name)) def seed(self): self.extracted = {} if self.mkdirs(self.manifests_path) is not None: for manifest_name in os.listdir(self.manifests_path): manifest_path = os.path.join(self.manifests_path, manifest_name) if manifest_name.endswith('.manifest'): stem = os.path.splitext(manifest_name)[0] manifest = RhizomeManifest.from_file(file(manifest_path)) if manifest is not None and stem == manifest.stem(): if self.sync(manifest): log('seeded %r' % (stem,)) self.extracted[stem] = manifest continue # Remove invalid manifests. self.unlink(manifest_path) def sync(self, manifest): payload_path = self.payload_path(manifest) if manifest.filesize == 0: self.unlink(payload_path) return True elif os.path.exists(payload_path): payload_hash = None if self.hash_errors.get(manifest.id, 0) + self.opts.error_retry < time.time(): payload_hash = servald_rhizome_hash(self.opts, payload_path) if payload_hash is None: self.hash_errors[manifest.id] = time.time() if payload_hash is None: # Can't tell if payload matches manifest or not. pass elif payload_hash == manifest.filehash: # This logic is DEFECTIVE for the case of encrypted payload, in which the hash is # for the ciphertext, not the clear text, but we are extracting the clear text, so # the hash will never match. unpack_dir = self.unpack_path(manifest) if not os.path.isdir(unpack_dir): self.unpack_payload(manifest) return True else: # Remove payload that does not match its manifest. self.unlink(payload_path) # Not synced -- have to extract the payload. return False def filter(self, manifest): if self.opts.name_filter: if not fnmatch.fnmatch(manifest.name or '', self.opts.name_filter): return False return True def list(self): self.available = None entries = servald_rhizome_list(self.opts) if entries is not None: self.available = {} for ent in entries: manifest = RhizomeManifest.from_list_entry(ent) if manifest is not None and manifest.service == 'file': stem = manifest.stem() self.available[stem] = manifest def update(self): if self.available is not None: for stem, manifest in self.available.iteritems(): manifest_path = self.manifest_path(manifest) payload_path = self.payload_path(manifest) if self.filter(manifest): extracted_manifest = self.extracted.get(stem) kwargs = {} if extracted_manifest is None or manifest.succeeds(extracted_manifest): kwargs['manifest_path'] = manifest_path if manifest.filesize == 0: self.unlink(payload_path) else: kwargs['payload_path'] = payload_path elif manifest.id == extracted_manifest.id: # Assume manifest and payload files are correct if present, unless # in 'paranoid' mode. if manifest.filesize == 0: self.unlink(payload_path) elif os.path.exists(payload_path): payload_hash = None if self.opts.paranoid: if self.hash_errors.get(manifest.id, 0) + self.opts.error_retry < time.time(): payload_hash = servald_rhizome_hash(self.opts, payload_path) if payload_hash is None: self.hash_errors[manifest.id] = time.time() if payload_hash is None or payload_hash == manifest.filehash: # Payload is consistent with manifest. Check that it is # unpacked. unpack_dir = self.unpack_path(manifest) if not os.path.isdir(unpack_dir): self.unpack_payload(manifest) else: # This logic is DEFECTIVE for the case of encrypted payload, in # which the hash is for the ciphertext, not the clear text, but # we are extracting the clear text, so the hash will never # match. kwargs['payload_path'] = payload_path else: kwargs['payload_path'] = payload_path if os.path.exists(manifest_path): self.touch(manifest_path) # Remember when this was last available if self.opts.paranoid: check_manifest = RhizomeManifest.from_file(file(manifest_path)) if check_manifest is None or check_manifest != extracted_manifest: kwargs['manifest_path'] = manifest_path else: kwargs['manifest_path'] = manifest_path else: # Ignore listed manifests with the same stem but different bundle ID; keep # the already-extracted bundle (until expired). pass if kwargs: extracted = None if self.extract_errors.get(manifest.id, 0) + self.opts.error_retry < time.time(): extracted = servald_rhizome_extract(self.opts, manifest.id, **kwargs) if extracted is None: self.extract_errors[manifest.id] = time.time() if extracted is None: pass elif extracted: extracted_manifest = RhizomeManifest.from_file(file(manifest_path)) if extracted_manifest is None or extracted_manifest.id != manifest.id or extracted_manifest.version != manifest.version: error('invalid manifest extracted for bid=%s' % (manifest.id,)) self.unlink(payload_path) self.unlink(manifest_path) self.extracted[stem] = None self.extract_errors[manifest.id] = time.time() else: self.extracted[stem] = extracted_manifest if 'payload_path' in kwargs: self.unpack_payload(manifest) def unpack_payload(self, manifest): unpack_dir_new = self.unpack_path_tmp(manifest, 'new') unpack_dir_old = self.unpack_path_tmp(manifest, 'old') self.rmtree(unpack_dir_new) if self.mkdirs(unpack_dir_new) is not None: payload_path = self.payload_path(manifest) try: if zipfile.is_zipfile(payload_path): log('unzip %s into %s' % (payload_path, unpack_dir_new)) with zipfile.ZipFile(payload_path) as zf: names = [] for name in zf.namelist(): if name.startswith('/') or '../' in name: log(' skip %s' % name) else: log(' extract %s' % name) zf.extract(name, unpack_dir_new) elif tarfile.is_tarfile(payload_path): log('untar %s into %s' % (payload_path, unpack_dir_new)) with tarfile.open(payload_path) as tf: names = [] for name in tf.getnames(): if name.startswith('/') or '../' in name: log(' skip %s' % name) else: log(' extract %s' % name) tf.extract(name, unpack_dir_new) else: return False unpack_dir = self.unpack_path(manifest) if os.path.exists(unpack_dir): self.rename(unpack_dir, unpack_dir_old) if self.rename(unpack_dir_new, unpack_dir): self.unpacked.add(unpack_dir) return True except zipfile.BadZipfile, e: error('cannot unzip %s: %s' % (payload_path, e)) return None except tarfile.TarError, e: error('cannot untar %s: %s' % (payload_path, e)) return None finally: self.rmtree(unpack_dir_new) self.rmtree(unpack_dir_old) return False def expire(self): now = time.time() if self.available is not None: for stem, extracted_manifest in self.extracted.iteritems(): if extracted_manifest is not None and stem not in self.available: manifest_path = self.manifest_path(extracted_manifest) payload_path = self.payload_path(extracted_manifest) if os.path.exists(manifest_path): if self.mtime(manifest_path) + self.opts.expire_delay < now: self.unlink(payload_path) self.unlink(manifest_path) self.extracted[stem] = None else: self.unlink(payload_path) def mkdirs(self, path): if os.path.isdir(path): return False log('mkdirs %r' % (path,)) try: os.makedirs(path) return True except OSError, e: error('cannot mkdir -p %r - %s' % (path, e)) return None def rmtree(self, path): if not os.path.exists(path): return False log('rmdirs %r' % (path,)) try: shutil.rmtree(path) return True except OSError, e: error('cannot rm -r %r - %s' % (path, e)) return None def touch(self, path): try: open(path, "r+").close() except OSError, e: error('cannot touch %r - %s' % (path, e)) def rename(self, path, newpath): if not os.path.exists(path): return False log('rename %r %r' % (path, newpath)) try: os.rename(path, newpath) return True except OSError, e: error('cannot rename %r to %r - %s' % (path, newpath, e)) return None def unlink(self, path): if not os.path.exists(path): return False log('unlink %r' % (path,)) try: os.unlink(path) return True except OSError, e: error('cannot unlink %r - %s' % (path, e)) return None def mtime(self, path): try: return os.stat(path).st_mtime except OSError, e: if e.errno != errno.ENOENT: error('cannot stat %r - %s' % (path, e)) return None class RhizomeManifest(object): def __init__(self, **fields): self.service = str_nonempty(fields['service']) self.id = bundle_id(fields['id']) self.version = ulong(fields['version']) self.filesize = ulong(fields['filesize']) if 'filesize' in fields else None self.date = time_ms(fields['date']) if 'date' in fields else None self.filehash = file_hash(fields['filehash']) if self.filesize or self.filesize is None else None self.sender = subscriber_id(fields['sender']) if 'sender' in fields else None self.recipient = subscriber_id(fields['recipient']) if 'recipient' in fields else None self.name = str(fields['name']) if 'name' in fields else None self._other = fields @staticmethod def fieldname(text): if text.isalnum(): return text.lower() raise ValueError('invalid literal for RhizomeManifest.fieldname(): %s' % (text,)) @classmethod def is_fieldname(cls, text): try: cls.fieldname(text) return True except ValueError: return False def __eq__(self, other): if not isinstance(other, type(self)): return NotImplemented return (self.service == other.service and self.id == other.id and self.version == other.version and self.filesize == other.filesize and self.date == other.date and self.filehash == other.filehash and self.sender == other.sender and self.recipient == other.recipient and self.name == other.name and self._other == other._other) def __ne__(self, other): if not isinstance(other, type(self)): return NotImplemented return not self.__eq__(other) def stem(self): return os.path.splitext(self.name)[0] + ':' + self.id[:12] def suffix(self): return os.path.splitext(self.name)[1] @classmethod def from_list_entry(cls, ent): fieldmap = {} fields = dict((fieldmap.get(key, key), value) for key, value in ent.__dict__.iteritems() if cls.is_fieldname(key) and len(value)) return cls(**fields) @classmethod def from_file(cls, f): body, sig = f.read().split('\0', 1) if body.endswith('\n'): body = body[:-1] fields = {} try: for line in body.split('\n'): field, value = line.split('=', 1) fields[cls.fieldname(field)] = value except (KeyError, ValueError): return None return cls(**fields) def succeeds(self, other): return self.id == other.id and self.version > other.version def ulong(text): n = long(text) if n >= 0: return n raise ValueError('invalid literal for ulong(): %s' % (text,)) def str_nonempty(text): s = str(text) if len(s) > 0: return s raise ValueError('invalid literal for str_nonempty(): %s' % (text,)) def time_ms(text): try: return long(text) except ValueError: pass raise ValueError('invalid literal for time_ms(): %r' % (text,)) def datetime_ms(text): try: ms = time_ms(text) return datetime.fromtimestamp(ms / 1000).replace(microsecond= ms % 1000) except ValueError: pass raise ValueError('invalid literal for datetime_ms(): %r' % (text,)) def subscriber_id(text): try: if len(text) == 64: return '%064X' % long(text, 16) except ValueError: pass raise ValueError('invalid literal for subscriber_id(): %r' % (text,)) def bundle_id(text): try: if len(text) == 64: return '%064X' % long(text, 16) except ValueError: pass raise ValueError('invalid literal for bundle_id(): %r' % (text,)) def file_hash(text): try: if len(text) == 128: return '%0128X' % long(text, 16) except ValueError: pass raise ValueError('invalid literal for file_hash(): %r' % (text,)) class ServaldInterfaceException(Exception): def __init__(self, servald, status, output, msg): super(ServaldInterfaceException, self).__init__('%s (exit status %d): %s' % (servald, status, msg)) self.status = status self.output = output self.servald = servald class RhizomeListEntry(object): def __init__(self, **kwargs): self.__dict__.update(kwargs) def __repr__(self): return '%s(%s)' % (self.__class__.__name__, ', '.join('%s=%r' % i for i in self.__dict__.iteritems())) def servald_rhizome_list(opts): args = ['rhizome', 'list', 'file'] try: status, words = invoke_servald(opts, args, output_words=True) except ServaldInterfaceException, e: error(e) return None if words is None: return None try: if len(words) < 1: raise ValueError('missing first word') ncols = int(words[0]) if len(words) < 1 + ncols: raise ValueError('missing column header') if (len(words) - (1 + ncols)) % ncols != 0: raise ValueError('incomplete row') colmap = {} for col, hdr in enumerate(words[1:ncols+1]): colmap[col] = re.sub(r'[^A-Za-z0-9_]', '_', hdr) rows = [] for i in xrange(ncols + 1, len(words), ncols): ent = RhizomeListEntry() for col in xrange(ncols): setattr(ent, colmap[col], words[i + col]) rows.append(ent) return rows except ValueError, e: error('invalid output from %s: %s' % (' '.join([opts.servald,] + args), e)) return None def servald_rhizome_hash(opts, path): args = ['rhizome', 'hash', 'file', path] try: status, out = invoke_servald(opts, args) except ServaldInterfaceException, e: error(e) return None if out is None: return None if out.endswith('\n'): out = out[:-1] try: return file_hash(out) except ValueError: raise ServaldInterfaceException(opts.servald, status, out, 'invalid output, not a hex file hash') def servald_rhizome_extract(opts, bid, manifest_path=None, payload_path=None): args = None if payload_path and manifest_path: args = ['rhizome', 'extract', 'bundle', bid, manifest_path, payload_path] elif payload_path: args = ['rhizome', 'extract', 'file', bid, payload_path] elif manifest_path: args = ['rhizome', 'extract', 'manifest', bid, manifest_path] if not args: return None try: status, out = invoke_servald(opts, args, output_keyvalue=True) except ServaldInterfaceException, e: error(e) return None return status == 0 def invoke_servald(opts, args, output_keyvalue=False, output_words=False): env = dict(os.environ) if output_words or output_keyvalue: delim = '\x01' env['SERVALD_OUTPUT_DELIMITER'] = delim env['SERVALD_INSTANCEPATH'] = opts.instancepath try: allargs = (opts.servald,) + tuple(args) log('execute ' + ' '.join(map(repr, allargs))) proc = subprocess.Popen(allargs, stdout= subprocess.PIPE, stderr= subprocess.PIPE, env= env, ) out, err = proc.communicate() except OSError, e: error('cannot execute %s - %s' % (executable, e)) return None, None if proc.returncode == 255: allargs = (os.path.basename(opts.servald),) + tuple(args) for line in err.split('\n'): if line.startswith('ERROR:') or line.startswith('WARN:'): error(re.sub(r'^(ERROR|WARN):\s*(\[\d+])?\s*\d\d\:\d\d\:\d\d\.\d+\s*', '', line)) raise ServaldInterfaceException(opts.servald, proc.returncode, None, 'exited with error') if out is not None and (output_words or output_keyvalue): if not out.endswith(delim): raise ServaldInterfaceException(opts.servald, proc.returncode, out, 'missing delimiter') out = out[:-1] words = out.split(delim) if output_keyvalue: keyvalue = {} if len(words) % 2 != 0: raise ServaldInterfaceException(opts.servald, proc.returncode, out, 'odd number of output fields') while words: key = words.pop(0) value = words.pop(0) keyvalue[key] = value out= keyvalue else: out = words return proc.returncode, out log_output = None def log(msg): if log_output: print >>log_output, '+ %s' % (msg,) def error(msg): print >>sys.stderr, '%s: %s' % (os.path.basename(sys.argv[0]), msg) def fatal(msg): error(msg) sys.exit(1) if __name__ == '__main__': main()