fuse/blackmatch: split into client/server (twisted server)

This implements a client/server split for blackmatch, where the client 
implements the fuse_main bindings and a simple blocking rpc client mechanism.
The server implements the other half of that rpc mechanism, and contains all
the actual logic for interpreting fuse requests in the context of the on disk
cache and requests to the tahoe node.  The server is based on a twisted reactor.

The rpc mechanism implements a simple method dispatch including marshalling,
using json, of basic inert data types, in a flat namespace (no objects).
The client side is written in a blocking idiom, to interface with the threading
model used by the fuse_main bindings, whereas the server side is written for a
twisted reactor-based environment, intended to facilitate implementing more 
sophisticated logic in that paradigm.  The two communicate over a unix domain
socket, allocated within the nodedir.

Command line usage is unchanged; the server is launched automatically by the
client. The server daemonizes itself, to avoid preventing the original parent
process (e.g. 'runtests') from waiting upon the server exiting.

The client keeps open a 'keepalive' connection to the server; upon loss thereof
the server will exit. This addresses the fact that the python-fuse bindings 
provide no notification of exit of the client process upon unmount.

The client thus provides a relatively thin 'shim' proxying requests from the
fuse_main bindings across the rpc to the server process, which handles the 
logic behind each request.  

For the time being, a '--no-split' option is provided to surpress the splitting
into client/server, yielding the prior behaviour.  Once the server logic gets
more complex and more entrenched in a twisted idiom, this might be removed.
The 'runtests' test harness currently tests both modes, as 'impl_c' and 
'impl_c_no_split'
This commit is contained in:
robk-tahoe 2008-10-16 08:08:46 -07:00
parent c0b2aae0d4
commit 4bc57f19c9
2 changed files with 583 additions and 65 deletions

View File

@ -5,25 +5,25 @@ from allmydata.uri import CHKFileURI, NewDirectoryURI, LiteralFileURI
from allmydata.scripts.common_http import do_http as do_http_req
from allmydata.util.hashutil import tagged_hash
from allmydata.util.assertutil import precondition
from allmydata.util import base32
from allmydata.util import base32, fileutil
from allmydata.scripts.common import get_aliases
from twisted.python import usage
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor, defer
import base64
import errno
import sha
import socket
import stat
import subprocess
import sys
import os
#import pprint
import errno
import stat
# pull in some spaghetti to make this stuff work without fuse-py being installed
try:
import _find_fuse_parts
junk = _find_fuse_parts
del junk
except ImportError:
pass
# one needs either python-fuse to have been installed in sys.path, or
# suitable affordances to be made in the build or runtime environment
import fuse
import time
@ -31,7 +31,7 @@ import traceback
import simplejson
import urllib
VERSIONSTR="0.6"
VERSIONSTR="0.7"
USAGE = 'usage: tahoe fuse [dir_cap_name] [fuse_options] mountpoint'
DEFAULT_DIRECTORY_VALIDITY=26
@ -62,6 +62,14 @@ class TahoeFuseOptions(usage.Options):
["cache-timeout", None, 20,
"Time, in seconds, to cache directory data."],
]
optFlags = [
['no-split', None,
'run stand-alone; no splitting into client and server'],
['server', None,
'server mode (should not be used by end users)'],
['server-shutdown', None,
'shutdown server (should not be used by end users)'],
]
def __init__(self):
usage.Options.__init__(self)
@ -84,6 +92,12 @@ class TahoeFuseOptions(usage.Options):
logfile = file('tfuse.log', 'ab')
def reopen_logfile(fname):
global logfile
log('switching to %s' % (fname,))
logfile.close()
logfile = file(fname, 'ab')
def log(msg):
logfile.write("%s: %s\n" % (time.asctime(), msg))
#time.sleep(0.1)
@ -273,31 +287,22 @@ class TahoeFuseFile(object):
self.log("fgetattr()")
s = os.fstat(self.fd)
self.log("fgetattr() -> %r" % (s,))
return s
return stat_to_dict(s)
@logexc
def ftruncate(self, len):
self.log("ftruncate(%r)" % (len,))
self.file.truncate(len)
class TahoeFuse(fuse.Fuse):
def __init__(self, tfs, *args, **kw):
log("TF: __init__(%r, %r)" % (args, kw))
class TahoeFuseBase(object):
def __init__(self, tfs):
log("TFB: __init__()")
self.tfs = tfs
#_tfs_ = tfs
#class MyFuseFile(TahoeFuseFile):
#tfs = _tfs_
#self.file_class = MyFuseFile
#log("TF: file_class: %r" % (self.file_class,))
self.files = {}
fuse.Fuse.__init__(self, *args, **kw)
def log(self, msg):
log("<TF> %s" % (msg, ))
log("<TFB> %s" % (msg, ))
@logexc
def readlink(self, path):
@ -379,7 +384,8 @@ class TahoeFuse(fuse.Fuse):
fs_size = 8*2**40 # 8Tb
fs_free = 2*2**40 # 2Tb
s = fuse.StatVfs(f_bsize = preferred_block_size,
#s = fuse.StatVfs(f_bsize = preferred_block_size,
s = dict(f_bsize = preferred_block_size,
f_frsize = block_size,
f_blocks = fs_size / block_size,
f_bfree = fs_free / block_size,
@ -389,16 +395,12 @@ class TahoeFuse(fuse.Fuse):
f_favail = 2**20, # available files (root)
f_flag = 2, # no suid
f_namemax = 255) # max name length
#self.log('statfs(): %r' % (s,))
return s
def fsinit(self):
self.log("fsinit()")
def main(self, *a, **kw):
self.log("main(%r, %r)" % (a, kw))
return fuse.Fuse.main(self, *a, **kw)
##################################################################
@logexc
@ -409,7 +411,8 @@ class TahoeFuse(fuse.Fuse):
return -errno.ENOENT
dirlist = ['.', '..'] + node.children.keys()
self.log('dirlist = %r' % (dirlist,))
return [fuse.Direntry(d) for d in dirlist]
#return [fuse.Direntry(d) for d in dirlist]
return dirlist
@logexc
def getattr(self, path):
@ -421,12 +424,13 @@ class TahoeFuse(fuse.Fuse):
mtime = self.tfs.root.mtime
s = TStat({}, st_mode=mode, st_nlink=1, st_mtime=mtime)
self.log('getattr(%r) -> %r' % (path, s))
return s
#return s
return stat_to_dict(s)
parent, name, child = self.tfs.get_parent_name_and_child(path)
if not child: # implicitly 'or not parent'
raise ENOENT('No such file or directory')
return parent.get_stat(name)
return stat_to_dict(parent.get_stat(name))
@logexc
def access(self, path, mode):
@ -510,14 +514,197 @@ class TahoeFuse(fuse.Fuse):
self.log("ftruncate(%r, %r)" % (path, len,))
return self._get_file(path).ftruncate(len)
class TahoeFuseLocal(TahoeFuseBase, fuse.Fuse):
def __init__(self, tfs, *args, **kw):
log("TFL: __init__(%r, %r)" % (args, kw))
TahoeFuseBase.__init__(self, tfs)
fuse.Fuse.__init__(self, *args, **kw)
def launch_tahoe_fuse(tfs, argv):
def log(self, msg):
log("<TFL> %s" % (msg, ))
def main(self, *a, **kw):
self.log("main(%r, %r)" % (a, kw))
return fuse.Fuse.main(self, *a, **kw)
# overrides for those methods which return objects not marshalled
def fgetattr(self, path):
return TStat({}, **(TahoeFuseBase.fgetattr(self, path)))
def getattr(self, path):
return TStat({}, **(TahoeFuseBase.getattr(self, path)))
def statfs(self):
return fuse.StatVfs(**(TahoeFuseBase.statfs(self)))
#self.log('statfs()')
#ret = fuse.StatVfs(**(TahoeFuseBase.statfs(self)))
#self.log('statfs(): %r' % (ret,))
#return ret
@logexc
def readdir(self, path, offset):
return [ fuse.Direntry(d) for d in TahoeFuseBase.readdir(self, path, offset) ]
class TahoeFuseShim(fuse.Fuse):
def __init__(self, trpc, *args, **kw):
log("TF: __init__(%r, %r)" % (args, kw))
self.trpc = trpc
fuse.Fuse.__init__(self, *args, **kw)
def log(self, msg):
log("<TFs> %s" % (msg, ))
@logexc
def readlink(self, path):
self.log("readlink(%r)" % (path,))
return self.trpc.call('readlink', path)
@logexc
def unlink(self, path):
self.log("unlink(%r)" % (path,))
return self.trpc.call('unlink', path)
@logexc
def rmdir(self, path):
self.log("rmdir(%r)" % (path,))
return self.trpc.call('unlink', path)
@logexc
def symlink(self, path, path1):
self.log("symlink(%r, %r)" % (path, path1))
return self.trpc.call('link', path, path1)
@logexc
def rename(self, path, path1):
self.log("rename(%r, %r)" % (path, path1))
return self.trpc.call('rename', path, path1)
@logexc
def link(self, path, path1):
self.log("link(%r, %r)" % (path, path1))
return self.trpc.call('link', path, path1)
@logexc
def chmod(self, path, mode):
self.log("XX chmod(%r, %r)" % (path, mode))
return self.trpc.call('chmod', path, mode)
@logexc
def chown(self, path, user, group):
self.log("XX chown(%r, %r, %r)" % (path, user, group))
return self.trpc.call('chown', path, user, group)
@logexc
def truncate(self, path, len):
self.log("XX truncate(%r, %r)" % (path, len))
return self.trpc.call('truncate', path, len)
@logexc
def utime(self, path, times):
self.log("XX utime(%r, %r)" % (path, times))
return self.trpc.call('utime', path, times)
@logexc
def statfs(self):
self.log("statfs()")
response = self.trpc.call('statfs')
#self.log("statfs(): %r" % (response,))
kwargs = dict([ (str(k),v) for k,v in response.items() ])
return fuse.StatVfs(**kwargs)
def fsinit(self):
self.log("fsinit()")
def main(self, *a, **kw):
self.log("main(%r, %r)" % (a, kw))
return fuse.Fuse.main(self, *a, **kw)
##################################################################
@logexc
def readdir(self, path, offset):
self.log('readdir(%r, %r)' % (path, offset))
return [ fuse.Direntry(d) for d in self.trpc.call('readdir', path, offset) ]
@logexc
def getattr(self, path):
self.log('getattr(%r)' % (path,))
response = self.trpc.call('getattr', path)
kwargs = dict([ (str(k),v) for k,v in response.items() ])
s = TStat({}, **kwargs)
self.log('getattr(%r) -> %r' % (path, s))
return s
@logexc
def access(self, path, mode):
self.log("access(%r, %r)" % (path, mode))
return self.trpc.call('access', path, mode)
@logexc
def mkdir(self, path, mode):
self.log("mkdir(%r, %r)" % (path, mode))
return self.trpc.call('mkdir', path, mode)
##################################################################
# file methods
def open(self, path, flags):
self.log('open(%r, %r)' % (path, flags, ))
return self.trpc.call('open', path, flags)
def create(self, path, flags, mode):
self.log('create(%r, %r, %r)' % (path, flags, mode))
return self.trpc.call('create', path, flags, mode)
##
def read(self, path, size, offset):
self.log('read(%r, %r, %r)' % (path, size, offset, ))
return self.trpc.call('read', path, size, offset)
@logexc
def write(self, path, buf, offset):
self.log("write(%r, -%s-, %r)" % (path, len(buf), offset))
return self.trpc.call('write', path, buf, offset)
@logexc
def release(self, path, flags):
self.log("release(%r, %r)" % (path, flags,))
return self.trpc.call('release', path, flags)
@logexc
def fsync(self, path, isfsyncfile):
self.log("fsync(%r, %r)" % (path, isfsyncfile,))
return self.trpc.call('fsync', path, isfsyncfile)
@logexc
def flush(self, path):
self.log("flush(%r)" % (path,))
return self.trpc.call('flush', path)
@logexc
def fgetattr(self, path):
self.log("fgetattr(%r)" % (path,))
#return self.trpc.call('fgetattr', path)
response = self.trpc.call('fgetattr', path)
kwargs = dict([ (str(k),v) for k,v in response.items() ])
s = TStat({}, **kwargs)
self.log('getattr(%r) -> %r' % (path, s))
return s
@logexc
def ftruncate(self, path, len):
self.log("ftruncate(%r, %r)" % (path, len,))
return self.trpc.call('ftruncate', path, len)
def launch_tahoe_fuse(tf_class, tobj, argv):
sys.argv = ['tahoe fuse'] + list(argv)
log('setting sys.argv=%r' % (sys.argv,))
config = TahoeFuseOptions()
server = TahoeFuse(tfs, version="%prog " +VERSIONSTR+", fuse "+ fuse.__version__,
usage=config.getSynopsis(),
dash_s_do='setsingle')
version = "%prog " +VERSIONSTR+", fuse "+ fuse.__version__
server = tf_class(tobj, version=version, usage=config.getSynopsis(), dash_s_do='setsingle')
server.parse(errex=1)
server.main()
@ -534,6 +721,16 @@ def fingerprint(uri):
return None
return base64.b32encode(sha.new(uri).digest()).lower()[:6]
stat_fields = [ 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size',
'st_atime', 'st_mtime', 'st_ctime', ]
def stat_to_dict(statobj, fields=None):
if fields is None:
fields = stat_fields
d = {}
for f in fields:
d[f] = getattr(statobj, f, None)
return d
class TStat(fuse.Stat):
# in fuse 0.2, these are set by fuse.Stat.__init__
# in fuse 0.2-pre3 (hardy) they are not. badness unsues if they're missing
@ -563,10 +760,7 @@ class TStat(fuse.Stat):
fuse.Stat.__init__(self, **kwargs)
def __repr__(self):
d = {}
for f in self.fields:
d[f] = getattr(self, f, None)
return "<Stat%r>" % (d,)
return "<Stat%r>" % (stat_to_dict(self),)
class Directory(object):
def __init__(self, tfs, ro_uri, rw_uri):
@ -594,7 +788,6 @@ class Directory(object):
def load(self, name=None):
now = time.time()
print 'loading', name or self
log('%s.loading(%s)' % (self, name))
url = self.tfs.compose_url("uri/%s?t=json", self.get_uri())
data = urllib.urlopen(url).read()
@ -916,18 +1109,259 @@ _tfs = None # to appease pyflakes; is set in main()
def print_tree():
log('tree:\n' + _tfs.pprint())
def main(argv):
log("\n\nmain(%s)" % (argv,))
def unmarshal(obj):
if obj is None or isinstance(obj, int) or isinstance(obj, long) or isinstance(obj, float):
return obj
elif isinstance(obj, unicode):
#log('unmarshal(%r)' % (obj,))
return base64.b64decode(obj)
elif isinstance(obj, list):
return map(unmarshal, obj)
elif isinstance(obj, dict):
return dict([ (k,unmarshal(v)) for k,v in obj.items() ])
else:
raise ValueError('object type not int,str,list,dict,none (%s) (%r)' % (type(obj), obj))
def marshal(obj):
if obj is None or isinstance(obj, int) or isinstance(obj, long) or isinstance(obj, float):
return obj
elif isinstance(obj, str):
return base64.b64encode(obj)
elif isinstance(obj, list) or isinstance(obj, tuple):
return map(marshal, obj)
elif isinstance(obj, dict):
return dict([ (k,marshal(v)) for k,v in obj.items() ])
else:
raise ValueError('object type not int,str,list,dict,none (%s)' % type(obj))
class TRPCProtocol(Protocol):
compute_response_sha1 = True
log_all_requests = False
def connectionMade(self):
self.buf = []
def dataReceived(self, data):
if data == 'keepalive\n':
log('keepalive connection on %r' % (self.transport,))
self.keepalive = True
return
if not data.endswith('\n'):
self.buf.append(data)
return
if self.buf:
self.buf.append(data)
reqstr = ''.join(self.buf)
self.buf = []
self.dispatch_request(reqstr)
else:
self.dispatch_request(data)
def dispatch_request(self, reqstr):
try:
req = simplejson.loads(reqstr)
except ValueError, ve:
log(ve)
return
d = defer.maybeDeferred(self.handle_request, req)
d.addCallback(self.send_response)
d.addErrback(self.send_error)
def send_error(self, failure):
log('failure: %s' % (failure,))
if failure.check(TFSIOError):
e = failure.value
self.send_response(['error', 'errno', e.args[0], e.args[1]])
else:
self.send_response(['error', 'failure', str(failure)])
def send_response(self, result):
response = simplejson.dumps(result)
header = { 'len': len(response), }
if self.compute_response_sha1:
header['sha1'] = base64.b64encode(sha.new(response).digest())
hdr = simplejson.dumps(header)
self.transport.write(hdr)
self.transport.write('\n')
self.transport.write(response)
self.transport.loseConnection()
def connectionLost(self, reason):
if hasattr(self, 'keepalive'):
log('keepalive connection %r lost, shutting down' % (self.transport,))
reactor.callLater(0, reactor.stop)
def handle_request(self, req):
if type(req) is not list or not req or len(req) < 1:
return ['error', 'malformed request']
if req[0] == 'call':
if len(req) < 3:
return ['error', 'malformed request']
methname = req[1]
try:
args = unmarshal(req[2])
except ValueError, ve:
return ['error', 'malformed arguments', str(ve)]
try:
meth = getattr(self.factory.server, methname)
except AttributeError, ae:
return ['error', 'no such method', str(ae)]
if self.log_all_requests:
log('call %s(%s)' % (methname, ', '.join(map(repr, args))))
try:
result = meth(*args)
except TFSIOError, e:
log('errno: %s; %s' % e.args)
return ['error', 'errno', e.args[0], e.args[1]]
except Exception, e:
log('exception: ' + traceback.format_exc())
return ['error', 'exception', str(e)]
d = defer.succeed(None)
d.addCallback(lambda junk: result) # result may be Deferred
d.addCallback(lambda res: ['result', marshal(res)]) # only applies if not errback
return d
class TFSServer(object):
def __init__(self, socket_path, server=None):
self.socket_path = socket_path
log('TFSServer init socket: %s' % (socket_path,))
self.factory = Factory()
self.factory.protocol = TRPCProtocol
if server:
self.factory.server = server
else:
self.factory.server = self
def get_service(self):
if not hasattr(self, 'svc'):
from twisted.application import strports
self.svc = strports.service('unix:'+self.socket_path, self.factory)
return self.svc
def run(self):
svc = self.get_service()
def ss():
try:
svc.startService()
except:
reactor.callLater(0, reactor.stop)
raise
reactor.callLater(0, ss)
reactor.run()
def hello(self):
return 'pleased to meet you'
def echo(self, arg):
return arg
def failex(self):
raise ValueError('expected')
def fail(self):
return defer.maybeDeferred(self.failex)
class RPCError(RuntimeError):
pass
class TRPC(object):
def __init__(self, socket_fname):
self.socket_fname = socket_fname
self.keepalive = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.keepalive.connect(self.socket_fname)
self.keepalive.send('keepalive\n')
log('requested keepalive on %s' % (self.keepalive,))
def req(self, req):
# open conenction to trpc server
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(self.socket_fname)
# send request
s.send(simplejson.dumps(req))
s.send('\n')
# read response header
hdr_data = s.recv(8192)
first_newline = hdr_data.index('\n')
header = hdr_data[:first_newline]
data = hdr_data[first_newline+1:]
hdr = simplejson.loads(header)
hdr_len = hdr['len']
if hdr.has_key('sha1'):
hdr_sha1 = base64.b64decode(hdr['sha1'])
spool = [data]
spool_sha = sha.new(data)
# spool response
while True:
data = s.recv(8192)
if data:
spool.append(data)
spool_sha.update(data)
else:
break
else:
spool = [data]
# spool response
while True:
data = s.recv(8192)
if data:
spool.append(data)
else:
break
s.close()
# decode response
resp = ''.join(spool)
spool = None
assert hdr_len == len(resp), str((hdr_len, len(resp), repr(resp)))
if hdr.has_key('sha1'):
data_sha1 = spool_sha.digest()
spool = spool_sha = None
assert hdr_sha1 == data_sha1, str((base32.b2a(hdr_sha1), base32.b2a(data_sha1)))
#else:
#print 'warning, server provided no sha1 to check'
return resp
def call(self, methodname, *args):
res = self.req(['call', methodname, marshal(args)])
result = simplejson.loads(res)
if not result or len(result) < 2:
raise TypeError('malformed response %r' % (result,))
if result[0] == 'error':
if result[1] == 'errno':
raise TFSIOError(result[2], result[3])
else:
raise RPCError(*(result[1:])) # error, exception / error, failure
elif result[0] == 'result':
return unmarshal(result[1])
else:
raise TypeError('unknown response type %r' % (result[0],))
def shutdown(self):
log('shutdown() closing keepalive %s' % (self.keepalive,))
self.keepalive.close()
def main(argv):
log("main(%s)" % (argv,))
# check for version or help options (no args == help)
if not argv:
argv = ['--help']
if len(argv) == 1 and argv[0] in ['-h', '--help', '--version']:
if len(argv) == 1 and argv[0] in ['-h', '--help']:
config = TahoeFuseOptions()
print >> sys.stderr, config
print >> sys.stderr, 'fuse usage follows:'
launch_tahoe_fuse(None, argv)
if len(argv) == 1 and argv[0] in ['-h', '--help', '--version']:
launch_tahoe_fuse(TahoeFuseLocal, None, argv)
return -2
# parse command line options
config = TahoeFuseOptions()
try:
#print 'parsing', argv
@ -937,6 +1371,7 @@ def main(argv):
print e
return -1
# check for which alias or uri is specified
if config['alias']:
alias = config['alias']
#print 'looking for aliases in', config['node-directory']
@ -944,9 +1379,10 @@ def main(argv):
if alias not in aliases:
raise usage.error('Alias %r not found' % (alias,))
root_uri = aliases[alias]
root_name = alias
elif config['root-uri']:
root_uri = config['root-uri']
alias = 'root-uri'
root_name = 'uri_' + base32.b2a(tagged_hash('root_name', root_uri))[:12]
# test the uri for structural validity:
try:
NewDirectoryURI.init_from_string(root_uri)
@ -955,33 +1391,109 @@ def main(argv):
else:
raise usage.error('At least one of --alias or --root-uri must be specified')
nodedir = config['node-directory']
nodeurl = config['node-url']
if not nodeurl:
nodeurl = getnodeurl(nodedir)
# switch to named log file.
global logfile
fname = 'tfuse.%s.log' % (alias,)
log('switching to %s' % (fname,))
logfile.close()
logfile = file(fname, 'ab')
log('\n'+(24*'_')+'init'+(24*'_')+'\n')
# allocate socket
socket_dir = os.path.join(os.path.expanduser(nodedir), "tfuse.sockets")
socket_path = os.path.join(socket_dir, root_name)
if len(socket_path) > 103:
# try googling AF_UNIX and sun_len for some taste of why this oddity exists.
raise OSError(errno.ENAMETOOLONG, 'socket path too long (%s)' % (socket_path,))
fileutil.make_dirs(socket_dir, 0700)
if os.path.exists(socket_path):
log('socket exists')
if config['server-shutdown']:
log('calling shutdown')
trpc = TRPC(socket_path)
result = trpc.shutdown()
log('result: %r' % (result,))
log('called shutdown')
return
else:
raise OSError(errno.EEXIST, 'fuse already running (%r exists)' % (socket_path,))
elif config['server-shutdown']:
raise OSError(errno.ENOTCONN, '--server-shutdown specified, but server not running')
if not os.path.exists(config.mountpoint):
raise OSError(2, 'No such file or directory: "%s"' % (config.mountpoint,))
raise OSError(errno.ENOENT, 'No such file or directory: "%s"' % (config.mountpoint,))
cache_timeout = float(config['cache-timeout'])
tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout)
print tfs.pprint()
# make tfs instance accesible to print_tree() for dbg
global _tfs
_tfs = tfs
#
# Standalone ("no-split")
#
if config['no-split']:
reopen_logfile('tfuse.%s.unsplit.log' % (root_name,))
log('\n'+(24*'_')+'init (unsplit)'+(24*'_')+'\n')
args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
launch_tahoe_fuse(tfs, args)
cache_timeout = float(config['cache-timeout'])
tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout)
#print tfs.pprint()
# make tfs instance accesible to print_tree() for dbg
_tfs = tfs
args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
launch_tahoe_fuse(TahoeFuseLocal, tfs, args)
#
# Server
#
elif config['server']:
reopen_logfile('tfuse.%s.server.log' % (root_name,))
log('\n'+(24*'_')+'init (server)'+(24*'_')+'\n')
log('daemonizing')
from twisted.scripts._twistd_unix import daemonize
daemonize()
cache_timeout = float(config['cache-timeout'])
tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout)
#print tfs.pprint()
# make tfs instance accesible to print_tree() for dbg
_tfs = tfs
log('launching tfs server')
tfuse = TahoeFuseBase(tfs)
tfs_server = TFSServer(socket_path, tfuse)
tfs_server.run()
log('tfs server ran, exiting')
#
# Client
#
else:
reopen_logfile('tfuse.%s.client.log' % (root_name,))
log('\n'+(24*'_')+'init (client)'+(24*'_')+'\n')
server_args = [sys.executable, sys.argv[0], '--server'] + argv
if 'Allmydata.app/Contents/MacOS' in sys.executable:
# in this case blackmatch is the 'fuse' subcommand of the 'tahoe' executable
# otherwise we assume blackmatch is being run from source
server_args.insert(2, 'fuse')
#print 'launching server:', server_args
server = subprocess.Popen(server_args)
waiting_since = time.time()
wait_at_most = 8
while not os.path.exists(socket_path):
log('waiting for appearance of %r' % (socket_path,))
print 'waiting...'
time.sleep(1)
if time.time() - waiting_since > wait_at_most:
log('%r did not appear within %ss' % (socket_path, wait_at_most))
raise IOError(2, 'no socket %s' % (socket_path,))
print 'running...'
#print 'launched server'
trpc = TRPC(socket_path)
args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
launch_tahoe_fuse(TahoeFuseShim, trpc, args)
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))

View File

@ -68,6 +68,12 @@ implementations = {
'--node-directory', '%(nodedir)s', '%(mountpath)s', ],
mount_wait=True,
suites=['read', 'write', ]),
'impl_c_no_split': dict(module=impl_c,
mount_args=['--cache-timeout', '0', '--root-uri', '%(root-uri)s',
'--no-split',
'--node-directory', '%(nodedir)s', '%(mountpath)s', ],
mount_wait=True,
suites=['read', 'write', ]),
}
if sys.platform == 'darwin':