mirror of
https://github.com/servalproject/serval-dna.git
synced 2025-01-11 15:32:51 +00:00
b4818c3d6f
Rhizome mirror daemon and Serval Maps push script handle errors better Added README.md
678 lines
27 KiB
Python
Executable File
678 lines
27 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
# Rhizome mirror daemon
|
|
# Copyright (C) 2013 Serval Project Inc.
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU General Public License
|
|
# as published by the Free Software Foundation; either version 2
|
|
# of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
|
|
"""This daemon regularly extracts file-sharing (service=file) bundles from
|
|
Rhizome into a mirror directory, and unpacks all bundles which are archive
|
|
format (zip, tar, tgz, etc.), into their constituent files. If a newer version
|
|
of an already-unpacked bundle arrives, the prior unpack is deleted and the new
|
|
version unpacked in its place.
|
|
|
|
This effectively maintains the directory as an up-to-date mirror of the local
|
|
Rhizome store content.
|
|
|
|
On-disk file/directory names are formed by conactenating the bundle name field
|
|
and the bundle ID separated by ':', in order to avoid collisions.
|
|
|
|
@author Andrew Bettison <andrew@servalproject.com>
|
|
"""
|
|
|
|
import sys
|
|
import errno
|
|
import time
|
|
import os
|
|
import os.path
|
|
import re
|
|
import argparse
|
|
import subprocess
|
|
import datetime
|
|
import shutil
|
|
import fnmatch
|
|
import zipfile
|
|
import tarfile
|
|
|
|
def main():
|
|
instancepath = os.environ.get('SERVALINSTANCE_PATH')
|
|
parser = argparse.ArgumentParser(description='Continuously extract Rhizome store into mirror directory.')
|
|
parser.add_argument('--mirror-dir',
|
|
dest='mirror_dir', metavar='PATH', required=True,
|
|
help='Path of directory to store extracted payloads')
|
|
parser.add_argument('--servald',
|
|
dest='servald', metavar='PATH', required=True,
|
|
help='Path of servald executable')
|
|
parser.add_argument('--instance',
|
|
dest='instancepath', metavar='PATH', required=not instancepath, default=instancepath,
|
|
help='Path of servald instance directory')
|
|
parser.add_argument('--interval',
|
|
dest='interval', metavar='N', type=float, default=0,
|
|
help='Sleep N seconds between polling Rhizome; N=0 means run once and exit')
|
|
parser.add_argument('--filter-name',
|
|
dest='name_filter', metavar='GLOB',
|
|
help='Only mirror bundles whose names match GLOB pattern')
|
|
parser.add_argument('--unpack-dir',
|
|
dest='unpack_dir', metavar='PATH', default=None,
|
|
help='Path of directory in which to unpack bundles (zip, tar, etc.)')
|
|
parser.add_argument('--exec-on-unpack',
|
|
dest='exec_on_unpack', metavar='EXECUTABLE',
|
|
help='Run EXECUTABLE after unpacking one or more bundles, arg1 = PATH (from --unpack-dir=PATH), arg2..n = names of unpacked directories in PATH')
|
|
parser.add_argument('--expire-delay',
|
|
dest='expire_delay', metavar='N', type=int, default=0,
|
|
help='Keep bundles in mirror for N seconds after no longer listed by Rhizome')
|
|
parser.add_argument('--error-retry',
|
|
dest='error_retry', metavar='N', type=int, default=600,
|
|
help='Wait N seconds before retrying failed operations')
|
|
parser.add_argument('--paranoid',
|
|
dest='paranoid', action='store_true',
|
|
help='Continually check for and correct corrupted mirror contents')
|
|
parser.add_argument('--log-to-stdout',
|
|
dest='log_to_stdout', action='store_true',
|
|
help='Log activity on standard output')
|
|
opts = parser.parse_args()
|
|
global log_output
|
|
log_output = sys.stderr if opts.log_to_stdout else None
|
|
try:
|
|
status, output = invoke_servald(opts, ['help'])
|
|
except ServaldInterfaceException, e:
|
|
fatal(e)
|
|
if status is None:
|
|
fatal('no executable servald')
|
|
if status != 0 or output is None:
|
|
fatal('faulty servald')
|
|
mirror = RhizomeMirror(opts)
|
|
mirror.clean_unpack_dir()
|
|
mirror.seed()
|
|
while True:
|
|
mirror.list()
|
|
mirror.update()
|
|
mirror.expire()
|
|
mirror.exec_on_unpack()
|
|
if not opts.interval:
|
|
break
|
|
time.sleep(opts.interval)
|
|
sys.exit(0)
|
|
|
|
class RhizomeMirror(object):
|
|
|
|
def __init__(self, opts):
|
|
self.opts = opts
|
|
self.hash_errors = {}
|
|
self.extract_errors = {}
|
|
self.extracted = {}
|
|
self.available = None
|
|
self.unpacked = set()
|
|
self.payloads_path = opts.mirror_dir
|
|
self.manifests_path = os.path.join(self.payloads_path, '.manifests')
|
|
self.unpacks_path = opts.unpack_dir
|
|
|
|
def manifest_path(self, manifest):
|
|
return os.path.join(self.manifests_path, manifest.stem()) + '.manifest'
|
|
|
|
def payload_path(self, manifest):
|
|
return os.path.join(self.payloads_path, manifest.stem()) + manifest.suffix()
|
|
|
|
def unpack_path(self, manifest):
|
|
if not self.unpacks_path:
|
|
return None
|
|
return os.path.join(self.unpacks_path, manifest.stem())
|
|
|
|
def unpack_path_tmp(self, manifest, var):
|
|
unpack_path = self.unpack_path(manifest)
|
|
if not unpack_path:
|
|
return None
|
|
head, tail = os.path.split(unpack_path)
|
|
return os.path.join(head, '.tmp-%s-%s' % (var, tail))
|
|
|
|
def clean_unpack_dir(self):
|
|
if self.unpacks_path and os.path.isdir(self.unpacks_path):
|
|
for name in os.listdir(self.unpacks_path):
|
|
if name.startswith('.tmp-'):
|
|
self.unlink(os.path.join(self.unpacks_path, name))
|
|
|
|
def seed(self):
|
|
self.extracted = {}
|
|
if self.mkdirs(self.manifests_path) is not None:
|
|
for manifest_name in os.listdir(self.manifests_path):
|
|
manifest_path = os.path.join(self.manifests_path, manifest_name)
|
|
if manifest_name.endswith('.manifest'):
|
|
stem = os.path.splitext(manifest_name)[0]
|
|
manifest = RhizomeManifest.from_file(file(manifest_path))
|
|
if manifest is not None and stem == manifest.stem():
|
|
if self.sync(manifest):
|
|
log('seeded %r' % (stem,))
|
|
self.extracted[stem] = manifest
|
|
continue
|
|
# Remove invalid manifests.
|
|
self.unlink(manifest_path)
|
|
|
|
def sync(self, manifest):
|
|
payload_path = self.payload_path(manifest)
|
|
if manifest.filesize == 0:
|
|
self.unlink(payload_path)
|
|
return True
|
|
elif os.path.exists(payload_path):
|
|
payload_hash = None
|
|
if self.hash_errors.get(manifest.id, 0) + self.opts.error_retry < time.time():
|
|
payload_hash = servald_rhizome_hash(self.opts, payload_path)
|
|
if payload_hash is None:
|
|
self.hash_errors[manifest.id] = time.time()
|
|
if payload_hash is None:
|
|
# Can't tell if payload matches manifest or not.
|
|
pass
|
|
elif payload_hash == manifest.filehash:
|
|
# This logic is DEFECTIVE for the case of encrypted payload, in which the hash is
|
|
# for the ciphertext, not the clear text, but we are extracting the clear text, so
|
|
# the hash will never match.
|
|
unpack_dir = self.unpack_path(manifest)
|
|
if not os.path.isdir(unpack_dir):
|
|
self.unpack_payload(manifest)
|
|
return True
|
|
else:
|
|
# Remove payload that does not match its manifest.
|
|
self.unlink(payload_path)
|
|
# Not synced -- have to extract the payload.
|
|
return False
|
|
|
|
def filter(self, manifest):
|
|
if self.opts.name_filter:
|
|
if not fnmatch.fnmatch(manifest.name or '', self.opts.name_filter):
|
|
return False
|
|
return True
|
|
|
|
def list(self):
|
|
self.available = None
|
|
entries = servald_rhizome_list(self.opts)
|
|
if entries is not None:
|
|
self.available = {}
|
|
for ent in entries:
|
|
manifest = RhizomeManifest.from_list_entry(ent)
|
|
if manifest is not None and manifest.service == 'file':
|
|
stem = manifest.stem()
|
|
self.available[stem] = manifest
|
|
|
|
def update(self):
|
|
if self.available is not None:
|
|
for stem, manifest in self.available.iteritems():
|
|
manifest_path = self.manifest_path(manifest)
|
|
payload_path = self.payload_path(manifest)
|
|
if self.filter(manifest):
|
|
extracted_manifest = self.extracted.get(stem)
|
|
kwargs = {}
|
|
if extracted_manifest is None or manifest.succeeds(extracted_manifest):
|
|
kwargs['manifest_path'] = manifest_path
|
|
if manifest.filesize == 0:
|
|
self.unlink(payload_path)
|
|
else:
|
|
kwargs['payload_path'] = payload_path
|
|
elif manifest.id == extracted_manifest.id:
|
|
# Assume manifest and payload files are correct if present, unless
|
|
# in 'paranoid' mode.
|
|
if manifest.filesize == 0:
|
|
self.unlink(payload_path)
|
|
elif os.path.exists(payload_path):
|
|
payload_hash = None
|
|
if self.opts.paranoid:
|
|
if self.hash_errors.get(manifest.id, 0) + self.opts.error_retry < time.time():
|
|
payload_hash = servald_rhizome_hash(self.opts, payload_path)
|
|
if payload_hash is None:
|
|
self.hash_errors[manifest.id] = time.time()
|
|
if payload_hash is None or payload_hash == manifest.filehash:
|
|
# Payload is consistent with manifest. Check that it is
|
|
# unpacked.
|
|
unpack_dir = self.unpack_path(manifest)
|
|
if not os.path.isdir(unpack_dir):
|
|
self.unpack_payload(manifest)
|
|
else:
|
|
# This logic is DEFECTIVE for the case of encrypted payload, in
|
|
# which the hash is for the ciphertext, not the clear text, but
|
|
# we are extracting the clear text, so the hash will never
|
|
# match.
|
|
kwargs['payload_path'] = payload_path
|
|
else:
|
|
kwargs['payload_path'] = payload_path
|
|
if os.path.exists(manifest_path):
|
|
self.touch(manifest_path) # Remember when this was last available
|
|
if self.opts.paranoid:
|
|
check_manifest = RhizomeManifest.from_file(file(manifest_path))
|
|
if check_manifest is None or check_manifest != extracted_manifest:
|
|
kwargs['manifest_path'] = manifest_path
|
|
else:
|
|
kwargs['manifest_path'] = manifest_path
|
|
else:
|
|
# Ignore listed manifests with the same stem but different bundle ID; keep
|
|
# the already-extracted bundle (until expired).
|
|
pass
|
|
if kwargs:
|
|
extracted = None
|
|
if self.extract_errors.get(manifest.id, 0) + self.opts.error_retry < time.time():
|
|
extracted = servald_rhizome_extract(self.opts, manifest.id, **kwargs)
|
|
if extracted is None:
|
|
self.extract_errors[manifest.id] = time.time()
|
|
if extracted is None:
|
|
pass
|
|
elif extracted:
|
|
extracted_manifest = RhizomeManifest.from_file(file(manifest_path))
|
|
if extracted_manifest is None or extracted_manifest.id != manifest.id or extracted_manifest.version != manifest.version:
|
|
error('invalid manifest extracted for bid=%s' % (manifest.id,))
|
|
self.unlink(payload_path)
|
|
self.unlink(manifest_path)
|
|
self.extracted[stem] = None
|
|
self.extract_errors[manifest.id] = time.time()
|
|
else:
|
|
self.extracted[stem] = extracted_manifest
|
|
if 'payload_path' in kwargs:
|
|
self.unpack_payload(manifest)
|
|
|
|
def unpack_payload(self, manifest):
|
|
unpack_dir_new = self.unpack_path_tmp(manifest, 'new')
|
|
unpack_dir_old = self.unpack_path_tmp(manifest, 'old')
|
|
self.rmtree(unpack_dir_new)
|
|
if self.mkdirs(unpack_dir_new) is not None:
|
|
payload_path = self.payload_path(manifest)
|
|
try:
|
|
if zipfile.is_zipfile(payload_path):
|
|
log('unzip %s into %s' % (payload_path, unpack_dir_new))
|
|
with zipfile.ZipFile(payload_path) as zf:
|
|
names = []
|
|
for name in zf.namelist():
|
|
if name.startswith('/') or '../' in name:
|
|
log(' skip %s' % name)
|
|
else:
|
|
log(' extract %s' % name)
|
|
zf.extract(name, unpack_dir_new)
|
|
elif tarfile.is_tarfile(payload_path):
|
|
log('untar %s into %s' % (payload_path, unpack_dir_new))
|
|
with tarfile.open(payload_path) as tf:
|
|
names = []
|
|
for name in tf.getnames():
|
|
if name.startswith('/') or '../' in name:
|
|
log(' skip %s' % name)
|
|
else:
|
|
log(' extract %s' % name)
|
|
tf.extract(name, unpack_dir_new)
|
|
else:
|
|
return False
|
|
unpack_dir = self.unpack_path(manifest)
|
|
if os.path.exists(unpack_dir):
|
|
self.rename(unpack_dir, unpack_dir_old)
|
|
if self.rename(unpack_dir_new, unpack_dir):
|
|
self.unpacked.add(unpack_dir)
|
|
return True
|
|
except zipfile.BadZipfile, e:
|
|
error('cannot unzip %s: %s' % (payload_path, e))
|
|
return None
|
|
except tarfile.TarError, e:
|
|
error('cannot untar %s: %s' % (payload_path, e))
|
|
return None
|
|
finally:
|
|
self.rmtree(unpack_dir_new)
|
|
self.rmtree(unpack_dir_old)
|
|
return False
|
|
|
|
def expire(self):
|
|
now = time.time()
|
|
if self.available is not None:
|
|
for stem, extracted_manifest in self.extracted.iteritems():
|
|
if extracted_manifest is not None and stem not in self.available:
|
|
manifest_path = self.manifest_path(extracted_manifest)
|
|
payload_path = self.payload_path(extracted_manifest)
|
|
if os.path.exists(manifest_path):
|
|
if self.mtime(manifest_path) + self.opts.expire_delay < now:
|
|
self.unlink(payload_path)
|
|
self.unlink(manifest_path)
|
|
self.extracted[stem] = None
|
|
else:
|
|
self.unlink(payload_path)
|
|
|
|
def exec_on_unpack(self):
|
|
if self.unpacked and self.opts.exec_on_unpack:
|
|
env = dict(os.environ)
|
|
global log_output
|
|
if log_output:
|
|
env['RHIZOME_MIRRORD_LOG_STDOUT'] = 'true'
|
|
args = [self.opts.exec_on_unpack, self.unpacks_path] + [os.path.relpath(p, self.unpacks_path) for p in sorted(self.unpacked)]
|
|
log("execute " + ' '.join(args))
|
|
try:
|
|
stat = subprocess.call(args, stdin= open('/dev/null'), env= env)
|
|
if stat != 0:
|
|
error('exec-on-unpack script %r failed - error status %d' % (self.opts.exec_on_unpack, stat))
|
|
except OSError, e:
|
|
error('cannot execute %r - %s' % (self.opts.exec_on_unpack, e))
|
|
self.unpacked.clear()
|
|
|
|
def mkdirs(self, path):
|
|
if os.path.isdir(path):
|
|
return False
|
|
log('mkdirs %r' % (path,))
|
|
try:
|
|
os.makedirs(path)
|
|
return True
|
|
except OSError, e:
|
|
error('cannot mkdir -p %r - %s' % (path, e))
|
|
return None
|
|
|
|
def rmtree(self, path):
|
|
if not os.path.exists(path):
|
|
return False
|
|
log('rmdirs %r' % (path,))
|
|
try:
|
|
shutil.rmtree(path)
|
|
return True
|
|
except OSError, e:
|
|
error('cannot rm -r %r - %s' % (path, e))
|
|
return None
|
|
|
|
def touch(self, path):
|
|
try:
|
|
open(path, "r+").close()
|
|
except OSError, e:
|
|
error('cannot touch %r - %s' % (path, e))
|
|
|
|
def rename(self, path, newpath):
|
|
if not os.path.exists(path):
|
|
return False
|
|
log('rename %r %r' % (path, newpath))
|
|
try:
|
|
os.rename(path, newpath)
|
|
return True
|
|
except OSError, e:
|
|
error('cannot rename %r to %r - %s' % (path, newpath, e))
|
|
return None
|
|
|
|
def unlink(self, path):
|
|
if not os.path.exists(path):
|
|
return False
|
|
log('unlink %r' % (path,))
|
|
try:
|
|
os.unlink(path)
|
|
return True
|
|
except OSError, e:
|
|
error('cannot unlink %r - %s' % (path, e))
|
|
return None
|
|
|
|
def mtime(self, path):
|
|
try:
|
|
return os.stat(path).st_mtime
|
|
except OSError, e:
|
|
if e.errno != errno.ENOENT:
|
|
error('cannot stat %r - %s' % (path, e))
|
|
return None
|
|
|
|
class RhizomeManifest(object):
|
|
|
|
def __init__(self, **fields):
|
|
self.service = str_nonempty(fields['service'])
|
|
self.id = bundle_id(fields['id'])
|
|
self.version = ulong(fields['version'])
|
|
self.filesize = ulong(fields['filesize']) if 'filesize' in fields else None
|
|
self.date = time_ms(fields['date']) if 'date' in fields else None
|
|
self.filehash = file_hash(fields['filehash']) if self.filesize or self.filesize is None else None
|
|
self.sender = subscriber_id(fields['sender']) if 'sender' in fields else None
|
|
self.recipient = subscriber_id(fields['recipient']) if 'recipient' in fields else None
|
|
self.name = str(fields['name']) if 'name' in fields else None
|
|
self._other = fields
|
|
|
|
@staticmethod
|
|
def fieldname(text):
|
|
if text.isalnum():
|
|
return text.lower()
|
|
raise ValueError('invalid literal for RhizomeManifest.fieldname(): %s' % (text,))
|
|
|
|
@classmethod
|
|
def is_fieldname(cls, text):
|
|
try:
|
|
cls.fieldname(text)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, type(self)):
|
|
return NotImplemented
|
|
return (self.service == other.service
|
|
and self.id == other.id
|
|
and self.version == other.version
|
|
and self.filesize == other.filesize
|
|
and self.date == other.date
|
|
and self.filehash == other.filehash
|
|
and self.sender == other.sender
|
|
and self.recipient == other.recipient
|
|
and self.name == other.name
|
|
and self._other == other._other)
|
|
|
|
def __ne__(self, other):
|
|
if not isinstance(other, type(self)):
|
|
return NotImplemented
|
|
return not self.__eq__(other)
|
|
|
|
def stem(self):
|
|
return os.path.splitext(self.name)[0] + ':' + self.id[:12]
|
|
|
|
def suffix(self):
|
|
return os.path.splitext(self.name)[1]
|
|
|
|
@classmethod
|
|
def from_list_entry(cls, ent):
|
|
fieldmap = {}
|
|
fields = dict((fieldmap.get(key, key), value) for key, value in ent.__dict__.iteritems() if cls.is_fieldname(key) and len(value))
|
|
return cls(**fields)
|
|
|
|
@classmethod
|
|
def from_file(cls, f):
|
|
body, sig = f.read().split('\0', 1)
|
|
if body.endswith('\n'):
|
|
body = body[:-1]
|
|
fields = {}
|
|
try:
|
|
for line in body.split('\n'):
|
|
field, value = line.split('=', 1)
|
|
fields[cls.fieldname(field)] = value
|
|
except (KeyError, ValueError):
|
|
return None
|
|
return cls(**fields)
|
|
|
|
def succeeds(self, other):
|
|
return self.id == other.id and self.version > other.version
|
|
|
|
def ulong(text):
|
|
n = long(text)
|
|
if n >= 0:
|
|
return n
|
|
raise ValueError('invalid literal for ulong(): %s' % (text,))
|
|
|
|
def str_nonempty(text):
|
|
s = str(text)
|
|
if len(s) > 0:
|
|
return s
|
|
raise ValueError('invalid literal for str_nonempty(): %s' % (text,))
|
|
|
|
def time_ms(text):
|
|
try:
|
|
return long(text)
|
|
except ValueError:
|
|
pass
|
|
raise ValueError('invalid literal for time_ms(): %r' % (text,))
|
|
|
|
def datetime_ms(text):
|
|
try:
|
|
ms = time_ms(text)
|
|
return datetime.fromtimestamp(ms / 1000).replace(microsecond= ms % 1000)
|
|
except ValueError:
|
|
pass
|
|
raise ValueError('invalid literal for datetime_ms(): %r' % (text,))
|
|
|
|
def subscriber_id(text):
|
|
try:
|
|
if len(text) == 64:
|
|
return '%064X' % long(text, 16)
|
|
except ValueError:
|
|
pass
|
|
raise ValueError('invalid literal for subscriber_id(): %r' % (text,))
|
|
|
|
def bundle_id(text):
|
|
try:
|
|
if len(text) == 64:
|
|
return '%064X' % long(text, 16)
|
|
except ValueError:
|
|
pass
|
|
raise ValueError('invalid literal for bundle_id(): %r' % (text,))
|
|
|
|
def file_hash(text):
|
|
try:
|
|
if len(text) == 128:
|
|
return '%0128X' % long(text, 16)
|
|
except ValueError:
|
|
pass
|
|
raise ValueError('invalid literal for file_hash(): %r' % (text,))
|
|
|
|
class ServaldInterfaceException(Exception):
|
|
def __init__(self, servald, status, output, msg):
|
|
super(ServaldInterfaceException, self).__init__('%s (exit status %d): %s' % (servald, status, msg))
|
|
self.status = status
|
|
self.output = output
|
|
self.servald = servald
|
|
|
|
class RhizomeListEntry(object):
|
|
def __init__(self, **kwargs):
|
|
self.__dict__.update(kwargs)
|
|
def __repr__(self):
|
|
return '%s(%s)' % (self.__class__.__name__, ', '.join('%s=%r' % i for i in self.__dict__.iteritems()))
|
|
|
|
def servald_rhizome_list(opts):
|
|
args = ['rhizome', 'list', 'file']
|
|
try:
|
|
status, words = invoke_servald(opts, args, output_words=True)
|
|
except ServaldInterfaceException, e:
|
|
error(e)
|
|
return None
|
|
if words is None:
|
|
return None
|
|
try:
|
|
if len(words) < 1:
|
|
raise ValueError('missing first word')
|
|
ncols = int(words[0])
|
|
if len(words) < 1 + ncols:
|
|
raise ValueError('missing column header')
|
|
if (len(words) - (1 + ncols)) % ncols != 0:
|
|
raise ValueError('incomplete row')
|
|
colmap = {}
|
|
for col, hdr in enumerate(words[1:ncols+1]):
|
|
colmap[col] = re.sub(r'[^A-Za-z0-9_]', '_', hdr)
|
|
rows = []
|
|
for i in xrange(ncols + 1, len(words), ncols):
|
|
ent = RhizomeListEntry()
|
|
for col in xrange(ncols):
|
|
setattr(ent, colmap[col], words[i + col])
|
|
rows.append(ent)
|
|
return rows
|
|
except ValueError, e:
|
|
error('invalid output from %s: %s' % (' '.join([opts.servald,] + args), e))
|
|
return None
|
|
|
|
def servald_rhizome_hash(opts, path):
|
|
args = ['rhizome', 'hash', 'file', path]
|
|
try:
|
|
status, out = invoke_servald(opts, args)
|
|
except ServaldInterfaceException, e:
|
|
error(e)
|
|
return None
|
|
if out is None:
|
|
return None
|
|
if out.endswith('\n'):
|
|
out = out[:-1]
|
|
try:
|
|
return file_hash(out)
|
|
except ValueError:
|
|
raise ServaldInterfaceException(opts.servald, status, out, 'invalid output, not a hex file hash')
|
|
|
|
def servald_rhizome_extract(opts, bid, manifest_path=None, payload_path=None):
|
|
args = None
|
|
if payload_path and manifest_path:
|
|
args = ['rhizome', 'extract', 'bundle', bid, manifest_path, payload_path]
|
|
elif payload_path:
|
|
args = ['rhizome', 'extract', 'file', bid, payload_path]
|
|
elif manifest_path:
|
|
args = ['rhizome', 'extract', 'manifest', bid, manifest_path]
|
|
if not args:
|
|
return None
|
|
try:
|
|
status, out = invoke_servald(opts, args, output_keyvalue=True)
|
|
except ServaldInterfaceException, e:
|
|
error(e)
|
|
return None
|
|
return status == 0
|
|
|
|
def invoke_servald(opts, args, output_keyvalue=False, output_words=False):
|
|
env = dict(os.environ)
|
|
if output_words or output_keyvalue:
|
|
delim = '\x01'
|
|
env['SERVALD_OUTPUT_DELIMITER'] = delim
|
|
env['SERVALINSTANCE_PATH'] = opts.instancepath
|
|
try:
|
|
allargs = (opts.servald,) + tuple(args)
|
|
log('execute ' + ' '.join(map(repr, allargs)))
|
|
proc = subprocess.Popen(allargs,
|
|
stdout= subprocess.PIPE,
|
|
stderr= subprocess.PIPE,
|
|
env= env,
|
|
)
|
|
out, err = proc.communicate()
|
|
except OSError, e:
|
|
error('cannot execute %s - %s' % (executable, e))
|
|
return None, None
|
|
if proc.returncode == 255:
|
|
allargs = (os.path.basename(opts.servald),) + tuple(args)
|
|
for line in err.split('\n'):
|
|
if line.startswith('ERROR:') or line.startswith('WARN:'):
|
|
error(re.sub(r'^(ERROR|WARN):\s*(\[\d+])?\s*\d\d\:\d\d\:\d\d\.\d+\s*', '', line))
|
|
raise ServaldInterfaceException(opts.servald, proc.returncode, None, 'exited with error')
|
|
if out is not None and (output_words or output_keyvalue):
|
|
if not out.endswith(delim):
|
|
raise ServaldInterfaceException(opts.servald, proc.returncode, out, 'missing delimiter')
|
|
out = out[:-1]
|
|
words = out.split(delim)
|
|
if output_keyvalue:
|
|
keyvalue = {}
|
|
if len(words) % 2 != 0:
|
|
raise ServaldInterfaceException(opts.servald, proc.returncode, out, 'odd number of output fields')
|
|
while words:
|
|
key = words.pop(0)
|
|
value = words.pop(0)
|
|
keyvalue[key] = value
|
|
out= keyvalue
|
|
else:
|
|
out = words
|
|
return proc.returncode, out
|
|
|
|
log_output = None
|
|
|
|
def log(msg):
|
|
if log_output:
|
|
print >>log_output, '+ %s' % (msg,)
|
|
|
|
def error(msg):
|
|
if log_output:
|
|
print >>log_output, '+ error(%r)' % (str(msg),)
|
|
print >>sys.stderr, '%s: %s' % (os.path.basename(sys.argv[0]), msg)
|
|
|
|
def fatal(msg):
|
|
error('FATAL: %s' % (msg,))
|
|
sys.exit(1)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|