Improve Rhizome mirror daemon script

New options: --unpack-dir --exec-on-unpack --log-to-stdout

Unpacks Zip and Tar files after they are extracted from Rhizome

Runs a given command (executable) after unpacking any Zip or Tar files
This commit is contained in:
Andrew Bettison 2013-02-21 22:42:21 +10:30
parent 66e78194f9
commit 7e206d5930

View File

@ -41,21 +41,28 @@ import re
import argparse
import subprocess
import datetime
import shutil
import fnmatch
import zipfile
import tarfile
def main():
instancepath = os.environ.get('SERVALINSTANCE_PATH')
parser = argparse.ArgumentParser(description='Continuously extract Rhizome store into mirror directory.')
parser.add_argument('--mirror-dir', dest='mirror_dir', required=True, help='Path of directory to store extracted payloads')
parser.add_argument('--interval', dest='interval', type=float, default=0, help='Seconds to sleep between polling Rhizome')
parser.add_argument('--servald', dest='servald', default='servald', help='Path of servald executable')
parser.add_argument('--servald-instance', dest='instancepath', default=os.environ.get('SERVALINSTANCE_PATH'), help='Path of servald instance directory')
parser.add_argument('--filter-name', dest='name_filter', help='Only mirror bundles whose names match given Glob pattern')
parser.add_argument('--expire-delay', dest='expire_delay', type=int, default=0, help='Keep bundles in mirror for this many seconds after no longer listed by Rhizome')
parser.add_argument('--error-retry', dest='error_retry', type=int, default=600, help='Wait this many seconds before retrying failed operations')
parser.add_argument('--mirror-dir', dest='mirror_dir', metavar='PATH', required=True, help='Path of directory to store extracted payloads')
parser.add_argument('--servald', dest='servald', metavar='PATH', required=True, help='Path of servald executable')
parser.add_argument('--instance', dest='instancepath', metavar='PATH', required=not instancepath, default=instancepath, help='Path of servald instance directory')
parser.add_argument('--interval', dest='interval', metavar='N', type=float, default=0, help='Sleep N seconds between polling Rhizome; N=0 means run once and exit')
parser.add_argument('--filter-name', dest='name_filter', metavar='GLOB', help='Only mirror bundles whose names match GLOB pattern')
parser.add_argument('--unpack-dir', dest='unpack_dir', metavar='PATH', default=None, help='Path of directory in which to unpack bundles (zip, tar, etc.)')
parser.add_argument('--exec-on-unpack', dest='exec_on_unpack', metavar='EXECUTABLE', help='Run EXECUTABLE after unpacking one or more bundles')
parser.add_argument('--expire-delay', dest='expire_delay', metavar='N', type=int, default=0, help='Keep bundles in mirror for N seconds after no longer listed by Rhizome')
parser.add_argument('--error-retry', dest='error_retry', metavar='N', type=int, default=600, help='Wait N seconds before retrying failed operations')
parser.add_argument('--paranoid', dest='paranoid', action='store_true', help='Continually check for and correct corrupted mirror contents')
parser.add_argument('--log-to-stdout', dest='log_to_stdout', action='store_true', help='Log activity on standard output')
opts = parser.parse_args()
if opts.instancepath is None:
fatal('missing --servald-instance option')
global log_output
log_output = sys.stderr if opts.log_to_stdout else None
try:
status, output = invoke_servald(opts, ['help'])
except ServaldInterfaceException, e:
@ -65,11 +72,21 @@ def main():
if status != 0 or output is None:
fatal('faulty servald')
mirror = RhizomeMirror(opts)
mirror.clean_unpack_dir()
mirror.seed()
while True:
mirror.list()
mirror.update()
mirror.expire()
if mirror.unpacked:
if opts.exec_on_unpack:
args = [opts.exec_on_unpack] + sorted(mirror.unpacked)
log("execute " + ' '.join(args))
try:
subprocess.call(args, stdin=open('/dev/null'))
mirror.unpacked.clear()
except OSError, e:
error('cannot execute %r - %s' % (opts.exec_on_unpack, e))
if not opts.interval:
break
time.sleep(opts.interval)
@ -83,8 +100,10 @@ class RhizomeMirror(object):
self.extract_errors = {}
self.extracted = {}
self.available = None
self.unpacked = set()
self.payloads_path = opts.mirror_dir
self.manifests_path = os.path.join(self.payloads_path, '.manifests')
self.unpacks_path = opts.unpack_dir
def manifest_path(self, manifest):
return os.path.join(self.manifests_path, manifest.stem()) + '.manifest'
@ -92,26 +111,39 @@ class RhizomeMirror(object):
def payload_path(self, manifest):
return os.path.join(self.payloads_path, manifest.stem()) + manifest.suffix()
def unpack_path(self, manifest):
if not self.unpacks_path:
return None
return os.path.join(self.unpacks_path, manifest.stem())
def unpack_path_tmp(self, manifest, var):
unpack_path = self.unpack_path(manifest)
if not unpack_path:
return None
head, tail = os.path.split(unpack_path)
return os.path.join(head, '.tmp-%s-%s' % (var, tail))
def clean_unpack_dir(self):
if self.unpacks_path and os.path.isdir(self.unpacks_path):
for name in os.listdir(self.unpacks_path):
if name.startswith('.tmp-'):
self.unlink(os.path.join(self.unpacks_path, name))
def seed(self):
self.extracted = {}
try:
os.makedirs(self.manifests_path)
except OSError, e:
if e.errno != errno.EEXIST:
raise
for manifest_name in os.listdir(self.manifests_path):
manifest_path = os.path.join(self.manifests_path, manifest_name)
print manifest_path
if manifest_name.endswith('.manifest'):
stem = os.path.splitext(manifest_name)[0]
manifest = RhizomeManifest.from_file(file(manifest_path))
if manifest is not None and stem == manifest.stem():
if self.sync(manifest):
log('seeded %r' % (stem,))
self.extracted[stem] = manifest
continue
# Remove invalid manifests.
self.unlink(manifest_path)
if self.mkdirs(self.manifests_path) is not None:
for manifest_name in os.listdir(self.manifests_path):
manifest_path = os.path.join(self.manifests_path, manifest_name)
if manifest_name.endswith('.manifest'):
stem = os.path.splitext(manifest_name)[0]
manifest = RhizomeManifest.from_file(file(manifest_path))
if manifest is not None and stem == manifest.stem():
if self.sync(manifest):
log('seeded %r' % (stem,))
self.extracted[stem] = manifest
continue
# Remove invalid manifests.
self.unlink(manifest_path)
def sync(self, manifest):
payload_path = self.payload_path(manifest)
@ -131,6 +163,9 @@ class RhizomeMirror(object):
# This logic is DEFECTIVE for the case of encrypted payload, in which the hash is
# for the ciphertext, not the clear text, but we are extracting the clear text, so
# the hash will never match.
unpack_dir = self.unpack_path(manifest)
if not os.path.isdir(unpack_dir):
self.unpack_payload(manifest)
return True
else:
# Remove payload that does not match its manifest.
@ -175,18 +210,24 @@ class RhizomeMirror(object):
if manifest.filesize == 0:
self.unlink(payload_path)
elif os.path.exists(payload_path):
payload_hash = None
if self.opts.paranoid:
payload_hash = None
if self.hash_errors.get(manifest.id, 0) + self.opts.error_retry < time.time():
payload_hash = servald_rhizome_hash(self.opts, payload_path)
if payload_hash is None:
self.hash_errors[manifest.id] = time.time()
if payload_hash is not None and payload_hash != manifest.filehash:
# This logic is DEFECTIVE for the case of encrypted payload, in
# which the hash is for the ciphertext, not the clear text, but
# we are extracting the clear text, so the hash will never
# match.
kwargs['payload_path'] = payload_path
if payload_hash is None or payload_hash == manifest.filehash:
# Payload is consistent with manifest. Check that it is
# unpacked.
unpack_dir = self.unpack_path(manifest)
if not os.path.isdir(unpack_dir):
self.unpack_payload(manifest)
else:
# This logic is DEFECTIVE for the case of encrypted payload, in
# which the hash is for the ciphertext, not the clear text, but
# we are extracting the clear text, so the hash will never
# match.
kwargs['payload_path'] = payload_path
else:
kwargs['payload_path'] = payload_path
if os.path.exists(manifest_path):
@ -219,6 +260,54 @@ class RhizomeMirror(object):
self.extract_errors[manifest.id] = time.time()
else:
self.extracted[stem] = extracted_manifest
if 'payload_path' in kwargs:
self.unpack_payload(manifest)
def unpack_payload(self, manifest):
unpack_dir_new = self.unpack_path_tmp(manifest, 'new')
unpack_dir_old = self.unpack_path_tmp(manifest, 'old')
self.rmtree(unpack_dir_new)
if self.mkdirs(unpack_dir_new) is not None:
payload_path = self.payload_path(manifest)
try:
if zipfile.is_zipfile(payload_path):
log('unzip %s into %s' % (payload_path, unpack_dir_new))
with zipfile.ZipFile(payload_path) as zf:
names = []
for name in zf.namelist():
if name.startswith('/') or '../' in name:
log(' skip %s' % name)
else:
log(' extract %s' % name)
zf.extract(name, unpack_dir_new)
elif tarfile.is_tarfile(payload_path):
log('untar %s into %s' % (payload_path, unpack_dir_new))
with tarfile.open(payload_path) as tf:
names = []
for name in tf.getnames():
if name.startswith('/') or '../' in name:
log(' skip %s' % name)
else:
log(' extract %s' % name)
tf.extract(name, unpack_dir_new)
else:
return False
unpack_dir = self.unpack_path(manifest)
if os.path.exists(unpack_dir):
self.rename(unpack_dir, unpack_dir_old)
if self.rename(unpack_dir_new, unpack_dir):
self.unpacked.add(unpack_dir)
return True
except zipfile.BadZipfile, e:
error('cannot unzip %s: %s' % (payload_path, e))
return None
except tarfile.TarError, e:
error('cannot untar %s: %s' % (payload_path, e))
return None
finally:
self.rmtree(unpack_dir_new)
self.rmtree(unpack_dir_old)
return False
def expire(self):
now = time.time()
@ -235,19 +324,55 @@ class RhizomeMirror(object):
else:
self.unlink(payload_path)
def mkdirs(self, path):
if os.path.isdir(path):
return False
log('mkdirs %r' % (path,))
try:
os.makedirs(path)
return True
except OSError, e:
error('cannot mkdir -p %r - %s' % (path, e))
return None
def rmtree(self, path):
if not os.path.exists(path):
return False
log('rmdirs %r' % (path,))
try:
shutil.rmtree(path)
return True
except OSError, e:
error('cannot rm -r %r - %s' % (path, e))
return None
def touch(self, path):
try:
open(path, "r+")
open(path, "r+").close()
except OSError, e:
error('cannot touch %r - %s' % (path, e))
def rename(self, path, newpath):
if not os.path.exists(path):
return False
log('rename %r %r' % (path, newpath))
try:
os.rename(path, newpath)
return True
except OSError, e:
error('cannot rename %r to %r - %s' % (path, newpath, e))
return None
def unlink(self, path):
if os.path.exists(path):
log('unlink %r' % (path,))
try:
os.unlink(path)
except OSError, e:
error('cannot unlink %r - %s' % (path, e))
if not os.path.exists(path):
return False
log('unlink %r' % (path,))
try:
os.unlink(path)
return True
except OSError, e:
error('cannot unlink %r - %s' % (path, e))
return None
def mtime(self, path):
try:
@ -503,8 +628,11 @@ def invoke_servald(opts, args, output_keyvalue=False, output_words=False):
out = words
return proc.returncode, out
log_output = None
def log(msg):
print '+ %s' % (msg,)
if log_output:
print >>log_output, '+ %s' % (msg,)
def error(msg):
print >>sys.stderr, '%s: %s' % (os.path.basename(sys.argv[0]), msg)