mirror of
https://github.com/servalproject/serval-dna.git
synced 2025-02-07 11:30:18 +00:00
Improve Rhizome mirror daemon script
Deal with servald error conditions and commands that return status=1
This commit is contained in:
parent
90386ce1b1
commit
66e78194f9
@ -44,19 +44,26 @@ import datetime
|
|||||||
import fnmatch
|
import fnmatch
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description='Unpack Rhizome bundles into mirror directory.')
|
parser = argparse.ArgumentParser(description='Continuously extract Rhizome store into mirror directory.')
|
||||||
|
parser.add_argument('--mirror-dir', dest='mirror_dir', required=True, help='Path of directory to store extracted payloads')
|
||||||
|
parser.add_argument('--interval', dest='interval', type=float, default=0, help='Seconds to sleep between polling Rhizome')
|
||||||
parser.add_argument('--servald', dest='servald', default='servald', help='Path of servald executable')
|
parser.add_argument('--servald', dest='servald', default='servald', help='Path of servald executable')
|
||||||
parser.add_argument('--servald-instance', dest='instancepath', default=os.environ.get('SERVALINSTANCE_PATH'), help='Path of servald instance directory')
|
parser.add_argument('--servald-instance', dest='instancepath', default=os.environ.get('SERVALINSTANCE_PATH'), help='Path of servald instance directory')
|
||||||
parser.add_argument('--interval', dest='interval', type=float, default=0, help='Seconds to sleep between polling Rhizome')
|
|
||||||
parser.add_argument('--mirror-dir', dest='mirror_dir', required=True, help='Path of directory to store extracted payloads')
|
|
||||||
parser.add_argument('--filter-name', dest='name_filter', help='Only mirror bundles whose names match given Glob pattern')
|
parser.add_argument('--filter-name', dest='name_filter', help='Only mirror bundles whose names match given Glob pattern')
|
||||||
parser.add_argument('--expire-delay', dest='expire_delay', default=0, help='Keep bundles in mirror for this many seconds after no longer listed by Rhizome')
|
parser.add_argument('--expire-delay', dest='expire_delay', type=int, default=0, help='Keep bundles in mirror for this many seconds after no longer listed by Rhizome')
|
||||||
|
parser.add_argument('--error-retry', dest='error_retry', type=int, default=600, help='Wait this many seconds before retrying failed operations')
|
||||||
parser.add_argument('--paranoid', dest='paranoid', action='store_true', help='Continually check for and correct corrupted mirror contents')
|
parser.add_argument('--paranoid', dest='paranoid', action='store_true', help='Continually check for and correct corrupted mirror contents')
|
||||||
opts = parser.parse_args()
|
opts = parser.parse_args()
|
||||||
if opts.instancepath is None:
|
if opts.instancepath is None:
|
||||||
fatal('missing --servald-instance option')
|
fatal('missing --servald-instance option')
|
||||||
if invoke_servald(opts, ['help']) is None:
|
try:
|
||||||
|
status, output = invoke_servald(opts, ['help'])
|
||||||
|
except ServaldInterfaceException, e:
|
||||||
|
fatal(e)
|
||||||
|
if status is None:
|
||||||
fatal('no executable servald')
|
fatal('no executable servald')
|
||||||
|
if status != 0 or output is None:
|
||||||
|
fatal('faulty servald')
|
||||||
mirror = RhizomeMirror(opts)
|
mirror = RhizomeMirror(opts)
|
||||||
mirror.seed()
|
mirror.seed()
|
||||||
while True:
|
while True:
|
||||||
@ -72,6 +79,8 @@ class RhizomeMirror(object):
|
|||||||
|
|
||||||
def __init__(self, opts):
|
def __init__(self, opts):
|
||||||
self.opts = opts
|
self.opts = opts
|
||||||
|
self.hash_errors = {}
|
||||||
|
self.extract_errors = {}
|
||||||
self.extracted = {}
|
self.extracted = {}
|
||||||
self.available = None
|
self.available = None
|
||||||
self.payloads_path = opts.mirror_dir
|
self.payloads_path = opts.mirror_dir
|
||||||
@ -92,6 +101,7 @@ class RhizomeMirror(object):
|
|||||||
raise
|
raise
|
||||||
for manifest_name in os.listdir(self.manifests_path):
|
for manifest_name in os.listdir(self.manifests_path):
|
||||||
manifest_path = os.path.join(self.manifests_path, manifest_name)
|
manifest_path = os.path.join(self.manifests_path, manifest_name)
|
||||||
|
print manifest_path
|
||||||
if manifest_name.endswith('.manifest'):
|
if manifest_name.endswith('.manifest'):
|
||||||
stem = os.path.splitext(manifest_name)[0]
|
stem = os.path.splitext(manifest_name)[0]
|
||||||
manifest = RhizomeManifest.from_file(file(manifest_path))
|
manifest = RhizomeManifest.from_file(file(manifest_path))
|
||||||
@ -109,8 +119,18 @@ class RhizomeMirror(object):
|
|||||||
self.unlink(payload_path)
|
self.unlink(payload_path)
|
||||||
return True
|
return True
|
||||||
elif os.path.exists(payload_path):
|
elif os.path.exists(payload_path):
|
||||||
payload_hash = servald_rhizome_hash(self.opts, payload_path)
|
payload_hash = None
|
||||||
if payload_hash == manifest.filehash:
|
if self.hash_errors.get(manifest.id, 0) + self.opts.error_retry < time.time():
|
||||||
|
payload_hash = servald_rhizome_hash(self.opts, payload_path)
|
||||||
|
if payload_hash is None:
|
||||||
|
self.hash_errors[manifest.id] = time.time()
|
||||||
|
if payload_hash is None:
|
||||||
|
# Can't tell if payload matches manifest or not.
|
||||||
|
pass
|
||||||
|
elif payload_hash == manifest.filehash:
|
||||||
|
# This logic is DEFECTIVE for the case of encrypted payload, in which the hash is
|
||||||
|
# for the ciphertext, not the clear text, but we are extracting the clear text, so
|
||||||
|
# the hash will never match.
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
# Remove payload that does not match its manifest.
|
# Remove payload that does not match its manifest.
|
||||||
@ -156,8 +176,16 @@ class RhizomeMirror(object):
|
|||||||
self.unlink(payload_path)
|
self.unlink(payload_path)
|
||||||
elif os.path.exists(payload_path):
|
elif os.path.exists(payload_path):
|
||||||
if self.opts.paranoid:
|
if self.opts.paranoid:
|
||||||
payload_hash = servald_rhizome_hash(self.opts, payload_path)
|
payload_hash = None
|
||||||
if payload_hash != manifest.filehash:
|
if self.hash_errors.get(manifest.id, 0) + self.opts.error_retry < time.time():
|
||||||
|
payload_hash = servald_rhizome_hash(self.opts, payload_path)
|
||||||
|
if payload_hash is None:
|
||||||
|
self.hash_errors[manifest.id] = time.time()
|
||||||
|
if payload_hash is not None and payload_hash != manifest.filehash:
|
||||||
|
# This logic is DEFECTIVE for the case of encrypted payload, in
|
||||||
|
# which the hash is for the ciphertext, not the clear text, but
|
||||||
|
# we are extracting the clear text, so the hash will never
|
||||||
|
# match.
|
||||||
kwargs['payload_path'] = payload_path
|
kwargs['payload_path'] = payload_path
|
||||||
else:
|
else:
|
||||||
kwargs['payload_path'] = payload_path
|
kwargs['payload_path'] = payload_path
|
||||||
@ -174,14 +202,21 @@ class RhizomeMirror(object):
|
|||||||
# the already-extracted bundle (until expired).
|
# the already-extracted bundle (until expired).
|
||||||
pass
|
pass
|
||||||
if kwargs:
|
if kwargs:
|
||||||
res = servald_rhizome_extract(self.opts, manifest.id, **kwargs)
|
extracted = None
|
||||||
if res:
|
if self.extract_errors.get(manifest.id, 0) + self.opts.error_retry < time.time():
|
||||||
|
extracted = servald_rhizome_extract(self.opts, manifest.id, **kwargs)
|
||||||
|
if extracted is None:
|
||||||
|
self.extract_errors[manifest.id] = time.time()
|
||||||
|
if extracted is None:
|
||||||
|
pass
|
||||||
|
elif extracted:
|
||||||
extracted_manifest = RhizomeManifest.from_file(file(manifest_path))
|
extracted_manifest = RhizomeManifest.from_file(file(manifest_path))
|
||||||
if extracted_manifest is None or extracted_manifest.id != manifest.id or extracted_manifest.version != manifest.version:
|
if extracted_manifest is None or extracted_manifest.id != manifest.id or extracted_manifest.version != manifest.version:
|
||||||
error('invalid manifest extracted for bid=%s' % (manifest.id,))
|
error('invalid manifest extracted for bid=%s' % (manifest.id,))
|
||||||
self.unlink(payload_path)
|
self.unlink(payload_path)
|
||||||
self.unlink(manifest_path)
|
self.unlink(manifest_path)
|
||||||
self.extracted[stem] = None
|
self.extracted[stem] = None
|
||||||
|
self.extract_errors[manifest.id] = time.time()
|
||||||
else:
|
else:
|
||||||
self.extracted[stem] = extracted_manifest
|
self.extracted[stem] = extracted_manifest
|
||||||
|
|
||||||
@ -189,9 +224,9 @@ class RhizomeMirror(object):
|
|||||||
now = time.time()
|
now = time.time()
|
||||||
if self.available is not None:
|
if self.available is not None:
|
||||||
for stem, extracted_manifest in self.extracted.iteritems():
|
for stem, extracted_manifest in self.extracted.iteritems():
|
||||||
if stem not in self.available:
|
if extracted_manifest is not None and stem not in self.available:
|
||||||
manifest_path = self.manifest_path(manifest)
|
manifest_path = self.manifest_path(extracted_manifest)
|
||||||
payload_path = self.payload_path(manifest)
|
payload_path = self.payload_path(extracted_manifest)
|
||||||
if os.path.exists(manifest_path):
|
if os.path.exists(manifest_path):
|
||||||
if self.mtime(manifest_path) + self.opts.expire_delay < now:
|
if self.mtime(manifest_path) + self.opts.expire_delay < now:
|
||||||
self.unlink(payload_path)
|
self.unlink(payload_path)
|
||||||
@ -250,6 +285,25 @@ class RhizomeManifest(object):
|
|||||||
except ValueError:
|
except ValueError:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
if not isinstance(other, type(self)):
|
||||||
|
return NotImplemented
|
||||||
|
return (self.service == other.service
|
||||||
|
and self.id == other.id
|
||||||
|
and self.version == other.version
|
||||||
|
and self.filesize == other.filesize
|
||||||
|
and self.date == other.date
|
||||||
|
and self.filehash == other.filehash
|
||||||
|
and self.sender == other.sender
|
||||||
|
and self.recipient == other.recipient
|
||||||
|
and self.name == other.name
|
||||||
|
and self._other == other._other)
|
||||||
|
|
||||||
|
def __ne__(self, other):
|
||||||
|
if not isinstance(other, type(self)):
|
||||||
|
return NotImplemented
|
||||||
|
return not self.__eq__(other)
|
||||||
|
|
||||||
def stem(self):
|
def stem(self):
|
||||||
return os.path.splitext(self.name)[0] + ':' + self.id[:12]
|
return os.path.splitext(self.name)[0] + ':' + self.id[:12]
|
||||||
|
|
||||||
@ -331,8 +385,9 @@ def file_hash(text):
|
|||||||
raise ValueError('invalid literal for file_hash(): %r' % (text,))
|
raise ValueError('invalid literal for file_hash(): %r' % (text,))
|
||||||
|
|
||||||
class ServaldInterfaceException(Exception):
|
class ServaldInterfaceException(Exception):
|
||||||
def __init__(self, servald, output, msg):
|
def __init__(self, servald, status, output, msg):
|
||||||
Exception.__init__(self, msg)
|
Exception.__init__(self, msg)
|
||||||
|
self.status = status
|
||||||
self.output = output
|
self.output = output
|
||||||
self.servald = servald
|
self.servald = servald
|
||||||
|
|
||||||
@ -344,7 +399,11 @@ class RhizomeListEntry(object):
|
|||||||
|
|
||||||
def servald_rhizome_list(opts):
|
def servald_rhizome_list(opts):
|
||||||
args = ['rhizome', 'list', 'file']
|
args = ['rhizome', 'list', 'file']
|
||||||
words = invoke_servald(opts, args, output_words=True)
|
try:
|
||||||
|
status, words = invoke_servald(opts, args, output_words=True)
|
||||||
|
except ServaldInterfaceException, e:
|
||||||
|
error(e)
|
||||||
|
return None
|
||||||
if words is None:
|
if words is None:
|
||||||
return None
|
return None
|
||||||
try:
|
try:
|
||||||
@ -371,7 +430,11 @@ def servald_rhizome_list(opts):
|
|||||||
|
|
||||||
def servald_rhizome_hash(opts, path):
|
def servald_rhizome_hash(opts, path):
|
||||||
args = ['rhizome', 'hash', 'file', path]
|
args = ['rhizome', 'hash', 'file', path]
|
||||||
out = invoke_servald(opts, args)
|
try:
|
||||||
|
status, out = invoke_servald(opts, args)
|
||||||
|
except ServaldInterfaceException, e:
|
||||||
|
error(e)
|
||||||
|
return None
|
||||||
if out is None:
|
if out is None:
|
||||||
return None
|
return None
|
||||||
if out.endswith('\n'):
|
if out.endswith('\n'):
|
||||||
@ -379,7 +442,7 @@ def servald_rhizome_hash(opts, path):
|
|||||||
try:
|
try:
|
||||||
return file_hash(out)
|
return file_hash(out)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
raise ServaldInterfaceException(opts.servald, out, 'invalid output, not a hex file hash')
|
raise ServaldInterfaceException(opts.servald, status, out, 'invalid output, not a hex file hash')
|
||||||
|
|
||||||
def servald_rhizome_extract(opts, bid, manifest_path=None, payload_path=None):
|
def servald_rhizome_extract(opts, bid, manifest_path=None, payload_path=None):
|
||||||
args = None
|
args = None
|
||||||
@ -391,7 +454,12 @@ def servald_rhizome_extract(opts, bid, manifest_path=None, payload_path=None):
|
|||||||
args = ['rhizome', 'extract', 'manifest', bid, manifest_path]
|
args = ['rhizome', 'extract', 'manifest', bid, manifest_path]
|
||||||
if not args:
|
if not args:
|
||||||
return None
|
return None
|
||||||
return invoke_servald(opts, args, output_keyvalue=True)
|
try:
|
||||||
|
status, out = invoke_servald(opts, args, output_keyvalue=True)
|
||||||
|
except ServaldInterfaceException, e:
|
||||||
|
error(e)
|
||||||
|
return None
|
||||||
|
return status == 0
|
||||||
|
|
||||||
def invoke_servald(opts, args, output_keyvalue=False, output_words=False):
|
def invoke_servald(opts, args, output_keyvalue=False, output_words=False):
|
||||||
env = dict(os.environ)
|
env = dict(os.environ)
|
||||||
@ -410,31 +478,30 @@ def invoke_servald(opts, args, output_keyvalue=False, output_words=False):
|
|||||||
out, err = proc.communicate()
|
out, err = proc.communicate()
|
||||||
except OSError, e:
|
except OSError, e:
|
||||||
error('cannot execute %s - %s' % (executable, e))
|
error('cannot execute %s - %s' % (executable, e))
|
||||||
return None
|
return None, None
|
||||||
if proc.returncode != 0:
|
if proc.returncode == 255:
|
||||||
error('%s exited with status %d' % (os.path.basename(opts.servald), proc.returncode))
|
allargs = (os.path.basename(opts.servald),) + tuple(args)
|
||||||
for line in err.split('\n'):
|
for line in err.split('\n'):
|
||||||
if line.startswith('ERROR:') or line.startswith('WARN:'):
|
if line.startswith('ERROR:') or line.startswith('WARN:'):
|
||||||
error(re.sub(r'^(ERROR|WARN):\s*(\[\d+])?\s*\d\d\:\d\d\:\d\d\.\d+\s*', '', line))
|
error(re.sub(r'^(ERROR|WARN):\s*(\[\d+])?\s*\d\d\:\d\d\:\d\d\.\d+\s*', '', line))
|
||||||
return None
|
raise ServaldInterfaceException(opts.servald, 255, None, 'exited with error')
|
||||||
if out is None:
|
if out is not None and (output_words or output_keyvalue):
|
||||||
return None
|
|
||||||
if output_words or output_keyvalue:
|
|
||||||
if not out.endswith(delim):
|
if not out.endswith(delim):
|
||||||
raise ServaldInterfaceException(opts.servald, out, 'missing delimiter')
|
raise ServaldInterfaceException(opts.servald, proc.returncode, out, 'missing delimiter')
|
||||||
out = out[:-1]
|
out = out[:-1]
|
||||||
words = out.split(delim)
|
words = out.split(delim)
|
||||||
if output_keyvalue:
|
if output_keyvalue:
|
||||||
keyvalue = {}
|
keyvalue = {}
|
||||||
if len(words) % 2 != 0:
|
if len(words) % 2 != 0:
|
||||||
raise ServaldInterfaceException(opts.servald, out, 'odd number of output fields')
|
raise ServaldInterfaceException(opts.servald, proc.returncode, out, 'odd number of output fields')
|
||||||
while words:
|
while words:
|
||||||
key = words.pop(0)
|
key = words.pop(0)
|
||||||
value = words.pop(0)
|
value = words.pop(0)
|
||||||
keyvalue[key] = value
|
keyvalue[key] = value
|
||||||
return keyvalue
|
out= keyvalue
|
||||||
return words
|
else:
|
||||||
return out
|
out = words
|
||||||
|
return proc.returncode, out
|
||||||
|
|
||||||
def log(msg):
|
def log(msg):
|
||||||
print '+ %s' % (msg,)
|
print '+ %s' % (msg,)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user