serval-dna/utilities/rhizome_mirrord
2013-02-18 17:24:01 +10:30

451 lines
17 KiB
Python
Executable File

#!/usr/bin/env python
# Rhizome mirror daemon
# Copyright (C) 2013 Serval Project Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""This daemon regularly extracts file-sharing (service=file) bundles from
Rhizome into a mirror directory, and unpacks all bundles which are archive
format (zip, tar, tgz, etc.), into their constituent files. If a newer version
of an already-unpacked bundle arrives, the prior unpack is deleted and the new
version unpacked in its place.
This effectively maintains the directory as an up-to-date mirror of the local
Rhizome store content.
On-disk file/directory names are formed by conactenating the bundle name field
and the bundle ID separated by ':', in order to avoid collisions.
@author Andrew Bettison <andrew@servalproject.com>
"""
import sys
import errno
import time
import os
import os.path
import re
import argparse
import subprocess
import datetime
import fnmatch
def main():
parser = argparse.ArgumentParser(description='Unpack Rhizome bundles into mirror directory.')
parser.add_argument('--servald', dest='servald', default='servald', help='Path of servald executable')
parser.add_argument('--servald-instance', dest='instancepath', default=os.environ.get('SERVALINSTANCE_PATH'), help='Path of servald instance directory')
parser.add_argument('--interval', dest='interval', type=float, default=0, help='Seconds to sleep between polling Rhizome')
parser.add_argument('--mirror-dir', dest='mirror_dir', required=True, help='Path of directory to store extracted payloads')
parser.add_argument('--filter-name', dest='name_filter', help='Only mirror bundles whose names match given Glob pattern')
parser.add_argument('--expire-delay', dest='expire_delay', default=0, help='Keep bundles in mirror for this many seconds after no longer listed by Rhizome')
parser.add_argument('--paranoid', dest='paranoid', action='store_true', help='Continually check for and correct corrupted mirror contents')
opts = parser.parse_args()
if opts.instancepath is None:
fatal('missing --servald-instance option')
if invoke_servald(opts, ['help']) is None:
fatal('no executable servald')
mirror = RhizomeMirror(opts)
mirror.seed()
while True:
mirror.list()
mirror.update()
mirror.expire()
if not opts.interval:
break
time.sleep(opts.interval)
sys.exit(0)
class RhizomeMirror(object):
def __init__(self, opts):
self.opts = opts
self.extracted = {}
self.available = None
self.payloads_path = opts.mirror_dir
self.manifests_path = os.path.join(self.payloads_path, '.manifests')
def manifest_path(self, manifest):
return os.path.join(self.manifests_path, manifest.stem()) + '.manifest'
def payload_path(self, manifest):
return os.path.join(self.payloads_path, manifest.stem()) + manifest.suffix()
def seed(self):
self.extracted = {}
try:
os.makedirs(self.manifests_path)
except OSError, e:
if e.errno != errno.EEXIST:
raise
for manifest_name in os.listdir(self.manifests_path):
manifest_path = os.path.join(self.manifests_path, manifest_name)
if manifest_name.endswith('.manifest'):
stem = os.path.splitext(manifest_name)[0]
manifest = RhizomeManifest.from_file(file(manifest_path))
if manifest is not None and stem == manifest.stem():
if self.sync(manifest):
log('seeded %r' % (stem,))
self.extracted[stem] = manifest
continue
# Remove invalid manifests.
self.unlink(manifest_path)
def sync(self, manifest):
payload_path = self.payload_path(manifest)
if manifest.filesize == 0:
self.unlink(payload_path)
return True
elif os.path.exists(payload_path):
payload_hash = servald_rhizome_hash(self.opts, payload_path)
if payload_hash == manifest.filehash:
return True
else:
# Remove payload that does not match its manifest.
self.unlink(payload_path)
# Not synced -- have to extract the payload.
return False
def filter(self, manifest):
if self.opts.name_filter:
if not fnmatch.fnmatch(manifest.name or '', self.opts.name_filter):
return False
return True
def list(self):
self.available = None
entries = servald_rhizome_list(self.opts)
if entries is not None:
self.available = {}
for ent in entries:
manifest = RhizomeManifest.from_list_entry(ent)
if manifest is not None and manifest.service == 'file':
stem = manifest.stem()
self.available[stem] = manifest
def update(self):
if self.available is not None:
for stem, manifest in self.available.iteritems():
manifest_path = self.manifest_path(manifest)
payload_path = self.payload_path(manifest)
if self.filter(manifest):
extracted_manifest = self.extracted.get(stem)
kwargs = {}
if extracted_manifest is None or manifest.succeeds(extracted_manifest):
kwargs['manifest_path'] = manifest_path
if manifest.filesize == 0:
self.unlink(payload_path)
else:
kwargs['payload_path'] = payload_path
elif manifest.id == extracted_manifest.id:
# Assume manifest and payload files are correct if present, unless
# in 'paranoid' mode.
if manifest.filesize == 0:
self.unlink(payload_path)
elif os.path.exists(payload_path):
if self.opts.paranoid:
payload_hash = servald_rhizome_hash(self.opts, payload_path)
if payload_hash != manifest.filehash:
kwargs['payload_path'] = payload_path
else:
kwargs['payload_path'] = payload_path
if os.path.exists(manifest_path):
self.touch(manifest_path) # Remember when this was last available
if self.opts.paranoid:
check_manifest = RhizomeManifest.from_file(file(manifest_path))
if check_manifest is None or check_manifest != extracted_manifest:
kwargs['manifest_path'] = manifest_path
else:
kwargs['manifest_path'] = manifest_path
else:
# Ignore listed manifests with the same stem but different bundle ID; keep
# the already-extracted bundle (until expired).
pass
if kwargs:
res = servald_rhizome_extract(self.opts, manifest.id, **kwargs)
if res:
extracted_manifest = RhizomeManifest.from_file(file(manifest_path))
if extracted_manifest is None or extracted_manifest.id != manifest.id or extracted_manifest.version != manifest.version:
error('invalid manifest extracted for bid=%s' % (manifest.id,))
self.unlink(payload_path)
self.unlink(manifest_path)
self.extracted[stem] = None
else:
self.extracted[stem] = extracted_manifest
def expire(self):
now = time.time()
if self.available is not None:
for stem, extracted_manifest in self.extracted.iteritems():
if stem not in self.available:
manifest_path = self.manifest_path(manifest)
payload_path = self.payload_path(manifest)
if os.path.exists(manifest_path):
if self.mtime(manifest_path) + self.opts.expire_delay < now:
self.unlink(payload_path)
self.unlink(manifest_path)
self.extracted[stem] = None
else:
self.unlink(payload_path)
def touch(self, path):
try:
open(path, "r+")
except OSError, e:
error('cannot touch %r - %s' % (path, e))
def unlink(self, path):
if os.path.exists(path):
log('unlink %r' % (path,))
try:
os.unlink(path)
except OSError, e:
error('cannot unlink %r - %s' % (path, e))
def mtime(self, path):
try:
return os.stat(path).st_mtime
except OSError, e:
if e.errno != errno.ENOENT:
error('cannot stat %r - %s' % (path, e))
return None
class RhizomeManifest(object):
def __init__(self, **fields):
self.service = str_nonempty(fields['service'])
self.id = bundle_id(fields['id'])
self.version = ulong(fields['version'])
self.filesize = ulong(fields['filesize']) if 'filesize' in fields else None
self.date = time_ms(fields['date']) if 'date' in fields else None
self.filehash = file_hash(fields['filehash']) if self.filesize or self.filesize is None else None
self.sender = subscriber_id(fields['sender']) if 'sender' in fields else None
self.recipient = subscriber_id(fields['recipient']) if 'recipient' in fields else None
self.name = str(fields['name']) if 'name' in fields else None
self._other = fields
@staticmethod
def fieldname(text):
if text.isalnum():
return text.lower()
raise ValueError('invalid literal for RhizomeManifest.fieldname(): %s' % (text,))
@classmethod
def is_fieldname(cls, text):
try:
cls.fieldname(text)
return True
except ValueError:
return False
def stem(self):
return os.path.splitext(self.name)[0] + ':' + self.id[:12]
def suffix(self):
return os.path.splitext(self.name)[1]
@classmethod
def from_list_entry(cls, ent):
fieldmap = {}
fields = dict((fieldmap.get(key, key), value) for key, value in ent.__dict__.iteritems() if cls.is_fieldname(key) and len(value))
return cls(**fields)
@classmethod
def from_file(cls, f):
body, sig = f.read().split('\0', 1)
if body.endswith('\n'):
body = body[:-1]
fields = {}
try:
for line in body.split('\n'):
field, value = line.split('=', 1)
fields[cls.fieldname(field)] = value
except (KeyError, ValueError):
return None
return cls(**fields)
def succeeds(self, other):
return self.id == other.id and self.version > other.version
def ulong(text):
n = long(text)
if n >= 0:
return n
raise ValueError('invalid literal for ulong(): %s' % (text,))
def str_nonempty(text):
s = str(text)
if len(s) > 0:
return s
raise ValueError('invalid literal for str_nonempty(): %s' % (text,))
def time_ms(text):
try:
return long(text)
except ValueError:
pass
raise ValueError('invalid literal for time_ms(): %r' % (text,))
def datetime_ms(text):
try:
ms = time_ms(text)
return datetime.fromtimestamp(ms / 1000).replace(microsecond= ms % 1000)
except ValueError:
pass
raise ValueError('invalid literal for datetime_ms(): %r' % (text,))
def subscriber_id(text):
try:
if len(text) == 64:
return '%064X' % long(text, 16)
except ValueError:
pass
raise ValueError('invalid literal for subscriber_id(): %r' % (text,))
def bundle_id(text):
try:
if len(text) == 64:
return '%064X' % long(text, 16)
except ValueError:
pass
raise ValueError('invalid literal for bundle_id(): %r' % (text,))
def file_hash(text):
try:
if len(text) == 128:
return '%0128X' % long(text, 16)
except ValueError:
pass
raise ValueError('invalid literal for file_hash(): %r' % (text,))
class ServaldInterfaceException(Exception):
def __init__(self, servald, output, msg):
Exception.__init__(self, msg)
self.output = output
self.servald = servald
class RhizomeListEntry(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, ', '.join('%s=%r' % i for i in self.__dict__.iteritems()))
def servald_rhizome_list(opts):
args = ['rhizome', 'list', 'file']
words = invoke_servald(opts, args, output_words=True)
if words is None:
return None
try:
if len(words) < 1:
raise ValueError('missing first word')
ncols = int(words[0])
if len(words) < 1 + ncols:
raise ValueError('missing column header')
if (len(words) - (1 + ncols)) % ncols != 0:
raise ValueError('incomplete row')
colmap = {}
for col, hdr in enumerate(words[1:ncols+1]):
colmap[col] = re.sub(r'[^A-Za-z0-9_]', '_', hdr)
rows = []
for i in xrange(ncols + 1, len(words), ncols):
ent = RhizomeListEntry()
for col in xrange(ncols):
setattr(ent, colmap[col], words[i + col])
rows.append(ent)
return rows
except ValueError, e:
error('invalid output from %s: %s' % (' '.join([opts.servald,] + args), e))
return None
def servald_rhizome_hash(opts, path):
args = ['rhizome', 'hash', 'file', path]
out = invoke_servald(opts, args)
if out is None:
return None
if out.endswith('\n'):
out = out[:-1]
try:
return file_hash(out)
except ValueError:
raise ServaldInterfaceException(opts.servald, out, 'invalid output, not a hex file hash')
def servald_rhizome_extract(opts, bid, manifest_path=None, payload_path=None):
args = None
if payload_path and manifest_path:
args = ['rhizome', 'extract', 'bundle', bid, manifest_path, payload_path]
elif payload_path:
args = ['rhizome', 'extract', 'file', bid, payload_path]
elif manifest_path:
args = ['rhizome', 'extract', 'manifest', bid, manifest_path]
if not args:
return None
return invoke_servald(opts, args, output_keyvalue=True)
def invoke_servald(opts, args, output_keyvalue=False, output_words=False):
env = dict(os.environ)
if output_words or output_keyvalue:
delim = '\x01'
env['SERVALD_OUTPUT_DELIMITER'] = delim
env['SERVALD_INSTANCEPATH'] = opts.instancepath
try:
allargs = (opts.servald,) + tuple(args)
log('execute ' + ' '.join(map(repr, allargs)))
proc = subprocess.Popen(allargs,
stdout= subprocess.PIPE,
stderr= subprocess.PIPE,
env= env,
)
out, err = proc.communicate()
except OSError, e:
error('cannot execute %s - %s' % (executable, e))
return None
if proc.returncode != 0:
error('%s exited with status %d' % (os.path.basename(opts.servald), proc.returncode))
for line in err.split('\n'):
if line.startswith('ERROR:') or line.startswith('WARN:'):
error(re.sub(r'^(ERROR|WARN):\s*(\[\d+])?\s*\d\d\:\d\d\:\d\d\.\d+\s*', '', line))
return None
if out is None:
return None
if output_words or output_keyvalue:
if not out.endswith(delim):
raise ServaldInterfaceException(opts.servald, out, 'missing delimiter')
out = out[:-1]
words = out.split(delim)
if output_keyvalue:
keyvalue = {}
if len(words) % 2 != 0:
raise ServaldInterfaceException(opts.servald, out, 'odd number of output fields')
while words:
key = words.pop(0)
value = words.pop(0)
keyvalue[key] = value
return keyvalue
return words
return out
def log(msg):
print '+ %s' % (msg,)
def error(msg):
print >>sys.stderr, '%s: %s' % (os.path.basename(sys.argv[0]), msg)
def fatal(msg):
error(msg)
sys.exit(1)
if __name__ == '__main__':
main()