From 7e206d5930e14d8cd3efd7ba2b48597b9b4aea8f Mon Sep 17 00:00:00 2001 From: Andrew Bettison Date: Thu, 21 Feb 2013 22:42:21 +1030 Subject: [PATCH] Improve Rhizome mirror daemon script New options: --unpack-dir --exec-on-unpack --log-to-stdout Unpacks Zip and Tar files after they are extracted from Rhizome Runs a given command (executable) after unpacking any Zip or Tar files --- utilities/rhizome_mirrord | 212 ++++++++++++++++++++++++++++++-------- 1 file changed, 170 insertions(+), 42 deletions(-) diff --git a/utilities/rhizome_mirrord b/utilities/rhizome_mirrord index 8d3888c2..9b3f95db 100755 --- a/utilities/rhizome_mirrord +++ b/utilities/rhizome_mirrord @@ -41,21 +41,28 @@ import re import argparse import subprocess import datetime +import shutil import fnmatch +import zipfile +import tarfile def main(): + instancepath = os.environ.get('SERVALINSTANCE_PATH') parser = argparse.ArgumentParser(description='Continuously extract Rhizome store into mirror directory.') - parser.add_argument('--mirror-dir', dest='mirror_dir', required=True, help='Path of directory to store extracted payloads') - parser.add_argument('--interval', dest='interval', type=float, default=0, help='Seconds to sleep between polling Rhizome') - parser.add_argument('--servald', dest='servald', default='servald', help='Path of servald executable') - parser.add_argument('--servald-instance', dest='instancepath', default=os.environ.get('SERVALINSTANCE_PATH'), help='Path of servald instance directory') - parser.add_argument('--filter-name', dest='name_filter', help='Only mirror bundles whose names match given Glob pattern') - parser.add_argument('--expire-delay', dest='expire_delay', type=int, default=0, help='Keep bundles in mirror for this many seconds after no longer listed by Rhizome') - parser.add_argument('--error-retry', dest='error_retry', type=int, default=600, help='Wait this many seconds before retrying failed operations') + parser.add_argument('--mirror-dir', dest='mirror_dir', metavar='PATH', required=True, help='Path of directory to store extracted payloads') + parser.add_argument('--servald', dest='servald', metavar='PATH', required=True, help='Path of servald executable') + parser.add_argument('--instance', dest='instancepath', metavar='PATH', required=not instancepath, default=instancepath, help='Path of servald instance directory') + parser.add_argument('--interval', dest='interval', metavar='N', type=float, default=0, help='Sleep N seconds between polling Rhizome; N=0 means run once and exit') + parser.add_argument('--filter-name', dest='name_filter', metavar='GLOB', help='Only mirror bundles whose names match GLOB pattern') + parser.add_argument('--unpack-dir', dest='unpack_dir', metavar='PATH', default=None, help='Path of directory in which to unpack bundles (zip, tar, etc.)') + parser.add_argument('--exec-on-unpack', dest='exec_on_unpack', metavar='EXECUTABLE', help='Run EXECUTABLE after unpacking one or more bundles') + parser.add_argument('--expire-delay', dest='expire_delay', metavar='N', type=int, default=0, help='Keep bundles in mirror for N seconds after no longer listed by Rhizome') + parser.add_argument('--error-retry', dest='error_retry', metavar='N', type=int, default=600, help='Wait N seconds before retrying failed operations') parser.add_argument('--paranoid', dest='paranoid', action='store_true', help='Continually check for and correct corrupted mirror contents') + parser.add_argument('--log-to-stdout', dest='log_to_stdout', action='store_true', help='Log activity on standard output') opts = parser.parse_args() - if opts.instancepath is None: - fatal('missing --servald-instance option') + global log_output + log_output = sys.stderr if opts.log_to_stdout else None try: status, output = invoke_servald(opts, ['help']) except ServaldInterfaceException, e: @@ -65,11 +72,21 @@ def main(): if status != 0 or output is None: fatal('faulty servald') mirror = RhizomeMirror(opts) + mirror.clean_unpack_dir() mirror.seed() while True: mirror.list() mirror.update() mirror.expire() + if mirror.unpacked: + if opts.exec_on_unpack: + args = [opts.exec_on_unpack] + sorted(mirror.unpacked) + log("execute " + ' '.join(args)) + try: + subprocess.call(args, stdin=open('/dev/null')) + mirror.unpacked.clear() + except OSError, e: + error('cannot execute %r - %s' % (opts.exec_on_unpack, e)) if not opts.interval: break time.sleep(opts.interval) @@ -83,8 +100,10 @@ class RhizomeMirror(object): self.extract_errors = {} self.extracted = {} self.available = None + self.unpacked = set() self.payloads_path = opts.mirror_dir self.manifests_path = os.path.join(self.payloads_path, '.manifests') + self.unpacks_path = opts.unpack_dir def manifest_path(self, manifest): return os.path.join(self.manifests_path, manifest.stem()) + '.manifest' @@ -92,26 +111,39 @@ class RhizomeMirror(object): def payload_path(self, manifest): return os.path.join(self.payloads_path, manifest.stem()) + manifest.suffix() + def unpack_path(self, manifest): + if not self.unpacks_path: + return None + return os.path.join(self.unpacks_path, manifest.stem()) + + def unpack_path_tmp(self, manifest, var): + unpack_path = self.unpack_path(manifest) + if not unpack_path: + return None + head, tail = os.path.split(unpack_path) + return os.path.join(head, '.tmp-%s-%s' % (var, tail)) + + def clean_unpack_dir(self): + if self.unpacks_path and os.path.isdir(self.unpacks_path): + for name in os.listdir(self.unpacks_path): + if name.startswith('.tmp-'): + self.unlink(os.path.join(self.unpacks_path, name)) + def seed(self): self.extracted = {} - try: - os.makedirs(self.manifests_path) - except OSError, e: - if e.errno != errno.EEXIST: - raise - for manifest_name in os.listdir(self.manifests_path): - manifest_path = os.path.join(self.manifests_path, manifest_name) - print manifest_path - if manifest_name.endswith('.manifest'): - stem = os.path.splitext(manifest_name)[0] - manifest = RhizomeManifest.from_file(file(manifest_path)) - if manifest is not None and stem == manifest.stem(): - if self.sync(manifest): - log('seeded %r' % (stem,)) - self.extracted[stem] = manifest - continue - # Remove invalid manifests. - self.unlink(manifest_path) + if self.mkdirs(self.manifests_path) is not None: + for manifest_name in os.listdir(self.manifests_path): + manifest_path = os.path.join(self.manifests_path, manifest_name) + if manifest_name.endswith('.manifest'): + stem = os.path.splitext(manifest_name)[0] + manifest = RhizomeManifest.from_file(file(manifest_path)) + if manifest is not None and stem == manifest.stem(): + if self.sync(manifest): + log('seeded %r' % (stem,)) + self.extracted[stem] = manifest + continue + # Remove invalid manifests. + self.unlink(manifest_path) def sync(self, manifest): payload_path = self.payload_path(manifest) @@ -131,6 +163,9 @@ class RhizomeMirror(object): # This logic is DEFECTIVE for the case of encrypted payload, in which the hash is # for the ciphertext, not the clear text, but we are extracting the clear text, so # the hash will never match. + unpack_dir = self.unpack_path(manifest) + if not os.path.isdir(unpack_dir): + self.unpack_payload(manifest) return True else: # Remove payload that does not match its manifest. @@ -175,18 +210,24 @@ class RhizomeMirror(object): if manifest.filesize == 0: self.unlink(payload_path) elif os.path.exists(payload_path): + payload_hash = None if self.opts.paranoid: - payload_hash = None if self.hash_errors.get(manifest.id, 0) + self.opts.error_retry < time.time(): payload_hash = servald_rhizome_hash(self.opts, payload_path) if payload_hash is None: self.hash_errors[manifest.id] = time.time() - if payload_hash is not None and payload_hash != manifest.filehash: - # This logic is DEFECTIVE for the case of encrypted payload, in - # which the hash is for the ciphertext, not the clear text, but - # we are extracting the clear text, so the hash will never - # match. - kwargs['payload_path'] = payload_path + if payload_hash is None or payload_hash == manifest.filehash: + # Payload is consistent with manifest. Check that it is + # unpacked. + unpack_dir = self.unpack_path(manifest) + if not os.path.isdir(unpack_dir): + self.unpack_payload(manifest) + else: + # This logic is DEFECTIVE for the case of encrypted payload, in + # which the hash is for the ciphertext, not the clear text, but + # we are extracting the clear text, so the hash will never + # match. + kwargs['payload_path'] = payload_path else: kwargs['payload_path'] = payload_path if os.path.exists(manifest_path): @@ -219,6 +260,54 @@ class RhizomeMirror(object): self.extract_errors[manifest.id] = time.time() else: self.extracted[stem] = extracted_manifest + if 'payload_path' in kwargs: + self.unpack_payload(manifest) + + def unpack_payload(self, manifest): + unpack_dir_new = self.unpack_path_tmp(manifest, 'new') + unpack_dir_old = self.unpack_path_tmp(manifest, 'old') + self.rmtree(unpack_dir_new) + if self.mkdirs(unpack_dir_new) is not None: + payload_path = self.payload_path(manifest) + try: + if zipfile.is_zipfile(payload_path): + log('unzip %s into %s' % (payload_path, unpack_dir_new)) + with zipfile.ZipFile(payload_path) as zf: + names = [] + for name in zf.namelist(): + if name.startswith('/') or '../' in name: + log(' skip %s' % name) + else: + log(' extract %s' % name) + zf.extract(name, unpack_dir_new) + elif tarfile.is_tarfile(payload_path): + log('untar %s into %s' % (payload_path, unpack_dir_new)) + with tarfile.open(payload_path) as tf: + names = [] + for name in tf.getnames(): + if name.startswith('/') or '../' in name: + log(' skip %s' % name) + else: + log(' extract %s' % name) + tf.extract(name, unpack_dir_new) + else: + return False + unpack_dir = self.unpack_path(manifest) + if os.path.exists(unpack_dir): + self.rename(unpack_dir, unpack_dir_old) + if self.rename(unpack_dir_new, unpack_dir): + self.unpacked.add(unpack_dir) + return True + except zipfile.BadZipfile, e: + error('cannot unzip %s: %s' % (payload_path, e)) + return None + except tarfile.TarError, e: + error('cannot untar %s: %s' % (payload_path, e)) + return None + finally: + self.rmtree(unpack_dir_new) + self.rmtree(unpack_dir_old) + return False def expire(self): now = time.time() @@ -235,19 +324,55 @@ class RhizomeMirror(object): else: self.unlink(payload_path) + def mkdirs(self, path): + if os.path.isdir(path): + return False + log('mkdirs %r' % (path,)) + try: + os.makedirs(path) + return True + except OSError, e: + error('cannot mkdir -p %r - %s' % (path, e)) + return None + + def rmtree(self, path): + if not os.path.exists(path): + return False + log('rmdirs %r' % (path,)) + try: + shutil.rmtree(path) + return True + except OSError, e: + error('cannot rm -r %r - %s' % (path, e)) + return None + def touch(self, path): try: - open(path, "r+") + open(path, "r+").close() except OSError, e: error('cannot touch %r - %s' % (path, e)) + def rename(self, path, newpath): + if not os.path.exists(path): + return False + log('rename %r %r' % (path, newpath)) + try: + os.rename(path, newpath) + return True + except OSError, e: + error('cannot rename %r to %r - %s' % (path, newpath, e)) + return None + def unlink(self, path): - if os.path.exists(path): - log('unlink %r' % (path,)) - try: - os.unlink(path) - except OSError, e: - error('cannot unlink %r - %s' % (path, e)) + if not os.path.exists(path): + return False + log('unlink %r' % (path,)) + try: + os.unlink(path) + return True + except OSError, e: + error('cannot unlink %r - %s' % (path, e)) + return None def mtime(self, path): try: @@ -503,8 +628,11 @@ def invoke_servald(opts, args, output_keyvalue=False, output_words=False): out = words return proc.returncode, out +log_output = None + def log(msg): - print '+ %s' % (msg,) + if log_output: + print >>log_output, '+ %s' % (msg,) def error(msg): print >>sys.stderr, '%s: %s' % (os.path.basename(sys.argv[0]), msg)