mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-20 11:38:52 +00:00
1715 lines
58 KiB
Python
1715 lines
58 KiB
Python
#!/usr/bin/env python
|
|
|
|
#-----------------------------------------------------------------------------------------------
|
|
from allmydata.uri import CHKFileURI, DirectoryURI, LiteralFileURI, is_literal_file_uri
|
|
from allmydata.scripts.common_http import do_http as do_http_req
|
|
from allmydata.util.hashutil import tagged_hash
|
|
from allmydata.util.assertutil import precondition
|
|
from allmydata.util import base32, fileutil, observer
|
|
from allmydata.scripts.common import get_aliases
|
|
|
|
from twisted.python import usage
|
|
from twisted.python.failure import Failure
|
|
from twisted.internet.protocol import Factory, Protocol
|
|
from twisted.internet import reactor, defer, task
|
|
from twisted.web import client
|
|
|
|
import base64
|
|
import errno
|
|
import heapq
|
|
import sha
|
|
import socket
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
import os
|
|
import weakref
|
|
#import pprint
|
|
|
|
# one needs either python-fuse to have been installed in sys.path, or
|
|
# suitable affordances to be made in the build or runtime environment
|
|
import fuse
|
|
|
|
import time
|
|
import traceback
|
|
import simplejson
|
|
import urllib
|
|
|
|
VERSIONSTR="0.7"
|
|
|
|
USAGE = 'usage: tahoe fuse [dir_cap_name] [fuse_options] mountpoint'
|
|
DEFAULT_DIRECTORY_VALIDITY=26
|
|
|
|
if not hasattr(fuse, '__version__'):
|
|
raise RuntimeError, \
|
|
"your fuse-py doesn't know of fuse.__version__, probably it's too old."
|
|
|
|
fuse.fuse_python_api = (0, 2)
|
|
fuse.feature_assert('stateful_files', 'has_init')
|
|
|
|
class TahoeFuseOptions(usage.Options):
|
|
optParameters = [
|
|
["node-directory", None, "~/.tahoe",
|
|
"Look here to find out which Tahoe node should be used for all "
|
|
"operations. The directory should either contain a full Tahoe node, "
|
|
"or a file named node.url which points to some other Tahoe node. "
|
|
"It should also contain a file named private/aliases which contains "
|
|
"the mapping from alias name to root dirnode URI."
|
|
],
|
|
["node-url", None, None,
|
|
"URL of the tahoe node to use, a URL like \"http://127.0.0.1:3456\". "
|
|
"This overrides the URL found in the --node-directory ."],
|
|
["alias", None, None,
|
|
"Which alias should be mounted."],
|
|
["root-uri", None, None,
|
|
"Which root directory uri should be mounted."],
|
|
["cache-timeout", None, 20,
|
|
"Time, in seconds, to cache directory data."],
|
|
]
|
|
optFlags = [
|
|
['no-split', None,
|
|
'run stand-alone; no splitting into client and server'],
|
|
['server', None,
|
|
'server mode (should not be used by end users)'],
|
|
['server-shutdown', None,
|
|
'shutdown server (should not be used by end users)'],
|
|
]
|
|
|
|
def __init__(self):
|
|
usage.Options.__init__(self)
|
|
self.fuse_options = []
|
|
self.mountpoint = None
|
|
|
|
def opt_option(self, fuse_option):
|
|
"""
|
|
Pass mount options directly to fuse. See below.
|
|
"""
|
|
self.fuse_options.append(fuse_option)
|
|
|
|
opt_o = opt_option
|
|
|
|
def parseArgs(self, mountpoint=''):
|
|
self.mountpoint = mountpoint
|
|
|
|
def getSynopsis(self):
|
|
return "%s [options] mountpoint" % (os.path.basename(sys.argv[0]),)
|
|
|
|
logfile = file('tfuse.log', 'ab')
|
|
|
|
def reopen_logfile(fname):
|
|
global logfile
|
|
log('switching to %s' % (fname,))
|
|
logfile.close()
|
|
logfile = file(fname, 'ab')
|
|
|
|
def log(msg):
|
|
logfile.write("%s: %s\n" % (time.asctime(), msg))
|
|
#time.sleep(0.1)
|
|
logfile.flush()
|
|
|
|
fuse.flog = log
|
|
|
|
def unicode_to_utf8_or_str(u):
|
|
if isinstance(u, unicode):
|
|
return u.encode('utf-8')
|
|
else:
|
|
precondition(isinstance(u, str), repr(u))
|
|
return u
|
|
|
|
def do_http(method, url, body=''):
|
|
resp = do_http_req(method, url, body)
|
|
log('do_http(%s, %s) -> %s, %s' % (method, url, resp.status, resp.reason))
|
|
if resp.status not in (200, 201):
|
|
raise RuntimeError('http response (%s, %s)' % (resp.status, resp.reason))
|
|
else:
|
|
return resp.read()
|
|
|
|
def flag2mode(flags):
|
|
log('flag2mode(%r)' % (flags,))
|
|
#md = {os.O_RDONLY: 'r', os.O_WRONLY: 'w', os.O_RDWR: 'w+'}
|
|
md = {os.O_RDONLY: 'rb', os.O_WRONLY: 'wb', os.O_RDWR: 'w+b'}
|
|
m = md[flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)]
|
|
|
|
if flags & os.O_APPEND:
|
|
m = m.replace('w', 'a', 1)
|
|
|
|
return m
|
|
|
|
class TFSIOError(IOError):
|
|
pass
|
|
|
|
class ENOENT(TFSIOError):
|
|
def __init__(self, msg):
|
|
TFSIOError.__init__(self, errno.ENOENT, msg)
|
|
|
|
class EINVAL(TFSIOError):
|
|
def __init__(self, msg):
|
|
TFSIOError.__init__(self, errno.EINVAL, msg)
|
|
|
|
class EACCESS(TFSIOError):
|
|
def __init__(self, msg):
|
|
TFSIOError.__init__(self, errno.EACCESS, msg)
|
|
|
|
class EEXIST(TFSIOError):
|
|
def __init__(self, msg):
|
|
TFSIOError.__init__(self, errno.EEXIST, msg)
|
|
|
|
class EIO(TFSIOError):
|
|
def __init__(self, msg):
|
|
TFSIOError.__init__(self, errno.EIO, msg)
|
|
|
|
def logargsretexc(meth):
|
|
def inner_logargsretexc(self, *args, **kwargs):
|
|
log("%s(%r, %r)" % (meth, args, kwargs))
|
|
try:
|
|
ret = meth(self, *args, **kwargs)
|
|
except:
|
|
log('exception:\n%s' % (traceback.format_exc(),))
|
|
raise
|
|
log("ret: %r" % (ret, ))
|
|
return ret
|
|
inner_logargsretexc.__name__ = '<logwrap(%s)>' % (meth,)
|
|
return inner_logargsretexc
|
|
|
|
def logexc(meth):
|
|
def inner_logexc(self, *args, **kwargs):
|
|
try:
|
|
ret = meth(self, *args, **kwargs)
|
|
except TFSIOError, tie:
|
|
log('error: %s' % (tie,))
|
|
raise
|
|
except:
|
|
log('exception:\n%s' % (traceback.format_exc(),))
|
|
raise
|
|
return ret
|
|
inner_logexc.__name__ = '<logwrap(%s)>' % (meth,)
|
|
return inner_logexc
|
|
|
|
def log_exc():
|
|
log('exception:\n%s' % (traceback.format_exc(),))
|
|
|
|
def repr_mode(mode=None):
|
|
if mode is None:
|
|
return 'none'
|
|
fields = ['S_ENFMT', 'S_IFBLK', 'S_IFCHR', 'S_IFDIR', 'S_IFIFO', 'S_IFLNK', 'S_IFREG', 'S_IFSOCK', 'S_IRGRP', 'S_IROTH', 'S_IRUSR', 'S_IRWXG', 'S_IRWXO', 'S_IRWXU', 'S_ISGID', 'S_ISUID', 'S_ISVTX', 'S_IWGRP', 'S_IWOTH', 'S_IWUSR', 'S_IXGRP', 'S_IXOTH', 'S_IXUSR']
|
|
ret = []
|
|
for field in fields:
|
|
fval = getattr(stat, field)
|
|
if (mode & fval) == fval:
|
|
ret.append(field)
|
|
return '|'.join(ret)
|
|
|
|
def repr_flags(flags=None):
|
|
if flags is None:
|
|
return 'none'
|
|
fields = [ 'O_APPEND', 'O_CREAT', 'O_DIRECT', 'O_DIRECTORY', 'O_EXCL', 'O_EXLOCK',
|
|
'O_LARGEFILE', 'O_NDELAY', 'O_NOCTTY', 'O_NOFOLLOW', 'O_NONBLOCK', 'O_RDWR',
|
|
'O_SHLOCK', 'O_SYNC', 'O_TRUNC', 'O_WRONLY', ]
|
|
ret = []
|
|
for field in fields:
|
|
fval = getattr(os, field, None)
|
|
if fval is not None and (flags & fval) == fval:
|
|
ret.append(field)
|
|
if not ret:
|
|
ret = ['O_RDONLY']
|
|
return '|'.join(ret)
|
|
|
|
class DownloaderWithReadQueue(object):
|
|
def __init__(self):
|
|
self.read_heap = []
|
|
self.dest_file_name = None
|
|
self.running = False
|
|
self.done_observer = observer.OneShotObserverList()
|
|
|
|
def __repr__(self):
|
|
name = self.dest_file_name is None and '<none>' or os.path.basename(self.dest_file_name)
|
|
return "<DWRQ(%s)> q(%s)" % (name, len(self.read_heap or []))
|
|
|
|
def log(self, msg):
|
|
log("%r: %s" % (self, msg))
|
|
|
|
@logexc
|
|
def start(self, url, dest_file_name, target_size, interval=0.5):
|
|
self.log('start(%s, %s, %s)' % (url, dest_file_name, target_size, ))
|
|
self.dest_file_name = dest_file_name
|
|
file(self.dest_file_name, 'wb').close() # touch
|
|
self.target_size = target_size
|
|
self.log('start()')
|
|
self.loop = task.LoopingCall(self._check_file_size)
|
|
self.loop.start(interval)
|
|
self.running = True
|
|
d = client.downloadPage(url, self.dest_file_name)
|
|
d.addCallbacks(self.done, self.fail)
|
|
return d
|
|
|
|
def when_done(self):
|
|
return self.done_observer.when_fired()
|
|
|
|
def get_size(self):
|
|
if os.path.exists(self.dest_file_name):
|
|
return os.path.getsize(self.dest_file_name)
|
|
else:
|
|
return 0
|
|
|
|
@logexc
|
|
def _read(self, posn, size):
|
|
#self.log('_read(%s, %s)' % (posn, size))
|
|
f = file(self.dest_file_name, 'rb')
|
|
f.seek(posn)
|
|
data = f.read(size)
|
|
f.close()
|
|
return data
|
|
|
|
@logexc
|
|
def read(self, posn, size):
|
|
self.log('read(%s, %s)' % (posn, size))
|
|
if self.read_heap is None:
|
|
raise ValueError('read() called when already shut down')
|
|
if posn+size > self.target_size:
|
|
size -= self.target_size - posn
|
|
fsize = self.get_size()
|
|
if posn+size < fsize:
|
|
return defer.succeed(self._read(posn, size))
|
|
else:
|
|
d = defer.Deferred()
|
|
dread = (posn+size, posn, d)
|
|
heapq.heappush(self.read_heap, dread)
|
|
return d
|
|
|
|
@logexc
|
|
def _check_file_size(self):
|
|
#self.log('_check_file_size()')
|
|
if self.read_heap:
|
|
try:
|
|
size = self.get_size()
|
|
while self.read_heap and self.read_heap[0][0] <= size:
|
|
end, start, d = heapq.heappop(self.read_heap)
|
|
data = self._read(start, end-start)
|
|
d.callback(data)
|
|
except Exception, e:
|
|
log_exc()
|
|
failure = Failure()
|
|
|
|
@logexc
|
|
def fail(self, failure):
|
|
self.log('fail(%s)' % (failure,))
|
|
self.running = False
|
|
if self.loop.running:
|
|
self.loop.stop()
|
|
# fail any reads still pending
|
|
for end, start, d in self.read_heap:
|
|
reactor.callLater(0, d.errback, failure)
|
|
self.read_heap = None
|
|
self.done_observer.fire_if_not_fired(failure)
|
|
return failure
|
|
|
|
@logexc
|
|
def done(self, result):
|
|
self.log('done()')
|
|
self.running = False
|
|
if self.loop.running:
|
|
self.loop.stop()
|
|
precondition(self.get_size() == self.target_size, self.get_size(), self.target_size)
|
|
self._check_file_size() # process anything left pending in heap
|
|
precondition(not self.read_heap, self.read_heap, self.target_size, self.get_size())
|
|
self.read_heap = None
|
|
self.done_observer.fire_if_not_fired(self)
|
|
return result
|
|
|
|
|
|
class TahoeFuseFile(object):
|
|
|
|
#def __init__(self, path, flags, *mode):
|
|
def __init__(self, tfs, path, flags, *mode):
|
|
log("TFF: __init__(%r, %r:%s, %r:%s)" % (path, flags, repr_flags(flags), mode, repr_mode(*mode)))
|
|
self.tfs = tfs
|
|
self.downloader = None
|
|
|
|
self._path = path # for tahoe put
|
|
try:
|
|
self.parent, self.name, self.fnode = self.tfs.get_parent_name_and_child(path)
|
|
m = flag2mode(flags)
|
|
log('TFF: flags2(mode) -> %s' % (m,))
|
|
if m[0] in 'wa':
|
|
# write
|
|
self.fname = self.tfs.cache.tmp_file(os.urandom(20))
|
|
if self.fnode is None:
|
|
log('TFF: [%s] open() for write: no file node, creating new File %s' % (self.name, self.fname, ))
|
|
self.fnode = File(0, LiteralFileURI.BASE_STRING)
|
|
self.fnode.tmp_fname = self.fname # XXX kill this
|
|
self.parent.add_child(self.name, self.fnode, {})
|
|
elif hasattr(self.fnode, 'tmp_fname'):
|
|
self.fname = self.fnode.tmp_fname
|
|
log('TFF: [%s] open() for write: existing file node lists %s' % (self.name, self.fname, ))
|
|
else:
|
|
log('TFF: [%s] open() for write: existing file node lists no tmp_file, using new %s' % (self.name, self.fname, ))
|
|
if mode != (0600,):
|
|
log('TFF: [%s] changing mode %s(%s) to 0600' % (self.name, repr_mode(*mode), mode))
|
|
mode = (0600,)
|
|
log('TFF: [%s] opening(%s) with flags %s(%s), mode %s(%s)' % (self.name, self.fname, repr_flags(flags|os.O_CREAT), flags|os.O_CREAT, repr_mode(*mode), mode))
|
|
#self.file = os.fdopen(os.open(self.fname, flags|os.O_CREAT, *mode), m)
|
|
self.file = os.fdopen(os.open(self.fname, flags|os.O_CREAT, *mode), m)
|
|
self.fd = self.file.fileno()
|
|
log('TFF: opened(%s) for write' % self.fname)
|
|
self.open_for_write = True
|
|
else:
|
|
# read
|
|
assert self.fnode is not None
|
|
uri = self.fnode.get_uri()
|
|
|
|
# XXX make this go away
|
|
if hasattr(self.fnode, 'tmp_fname'):
|
|
self.fname = self.fnode.tmp_fname
|
|
log('TFF: reopening(%s) for reading' % self.fname)
|
|
else:
|
|
if is_literal_file_uri(uri) or not self.tfs.async:
|
|
log('TFF: synchronously fetching file from cache for reading')
|
|
self.fname = self.tfs.cache.get_file(uri)
|
|
else:
|
|
log('TFF: asynchronously fetching file from cache for reading')
|
|
self.fname, self.downloader = self.tfs.cache.async_get_file(uri)
|
|
# downloader is None if the cache already contains the file
|
|
if self.downloader is not None:
|
|
d = self.downloader.when_done()
|
|
def download_complete(junk):
|
|
# once the download is complete, revert to non-async behaviour
|
|
self.downloader = None
|
|
d.addCallback(download_complete)
|
|
|
|
self.file = os.fdopen(os.open(self.fname, flags, *mode), m)
|
|
self.fd = self.file.fileno()
|
|
self.open_for_write = False
|
|
log('TFF: opened(%s) for read' % self.fname)
|
|
except:
|
|
log_exc()
|
|
raise
|
|
|
|
def log(self, msg):
|
|
log("<TFF(%s:%s)> %s" % (os.path.basename(self.fname), self.name, msg))
|
|
|
|
@logexc
|
|
def read(self, size, offset):
|
|
self.log('read(%r, %r)' % (size, offset, ))
|
|
if self.downloader:
|
|
# then we're busy doing an async download
|
|
# (and hence implicitly, we're in an environment that supports twisted)
|
|
#self.log('passing read() to %s' % (self.downloader, ))
|
|
d = self.downloader.read(offset, size)
|
|
def thunk(failure):
|
|
raise EIO(str(failure))
|
|
d.addErrback(thunk)
|
|
return d
|
|
else:
|
|
self.log('servicing read() from %s' % (self.file, ))
|
|
self.file.seek(offset)
|
|
return self.file.read(size)
|
|
|
|
@logexc
|
|
def write(self, buf, offset):
|
|
self.log("write(-%s-, %r)" % (len(buf), offset))
|
|
if not self.open_for_write:
|
|
return -errno.EACCES
|
|
self.file.seek(offset)
|
|
self.file.write(buf)
|
|
return len(buf)
|
|
|
|
@logexc
|
|
def release(self, flags):
|
|
self.log("release(%r)" % (flags,))
|
|
self.file.close()
|
|
if self.open_for_write:
|
|
size = os.path.getsize(self.fname)
|
|
self.fnode.size = size
|
|
file_cap = self.tfs.upload(self.fname)
|
|
self.fnode.ro_uri = file_cap
|
|
# XXX [ ] TODO: set metadata
|
|
# write new uri into parent dir entry
|
|
self.parent.add_child(self.name, self.fnode, {})
|
|
self.log("uploaded: %s" % (file_cap,))
|
|
|
|
# dbg
|
|
print_tree()
|
|
|
|
def _fflush(self):
|
|
if 'w' in self.file.mode or 'a' in self.file.mode:
|
|
self.file.flush()
|
|
|
|
@logexc
|
|
def fsync(self, isfsyncfile):
|
|
self.log("fsync(%r)" % (isfsyncfile,))
|
|
self._fflush()
|
|
if isfsyncfile and hasattr(os, 'fdatasync'):
|
|
os.fdatasync(self.fd)
|
|
else:
|
|
os.fsync(self.fd)
|
|
|
|
@logexc
|
|
def flush(self):
|
|
self.log("flush()")
|
|
self._fflush()
|
|
# cf. xmp_flush() in fusexmp_fh.c
|
|
os.close(os.dup(self.fd))
|
|
|
|
@logexc
|
|
def fgetattr(self):
|
|
self.log("fgetattr()")
|
|
s = os.fstat(self.fd)
|
|
d = stat_to_dict(s)
|
|
if self.downloader:
|
|
size = self.downloader.target_size
|
|
self.log("fgetattr() during async download, cache file: %s, size=%s" % (s, size))
|
|
d['st_size'] = size
|
|
self.log("fgetattr() -> %r" % (d,))
|
|
return d
|
|
|
|
@logexc
|
|
def ftruncate(self, len):
|
|
self.log("ftruncate(%r)" % (len,))
|
|
self.file.truncate(len)
|
|
|
|
class TahoeFuseBase(object):
|
|
|
|
def __init__(self, tfs):
|
|
log("TFB: __init__()")
|
|
self.tfs = tfs
|
|
self.files = {}
|
|
|
|
def log(self, msg):
|
|
log("<TFB> %s" % (msg, ))
|
|
|
|
@logexc
|
|
def readlink(self, path):
|
|
self.log("readlink(%r)" % (path,))
|
|
node = self.tfs.get_path(path)
|
|
if node:
|
|
raise EINVAL('Not a symlink') # nothing in tahoe is a symlink
|
|
else:
|
|
raise ENOENT('Invalid argument')
|
|
|
|
@logexc
|
|
def unlink(self, path):
|
|
self.log("unlink(%r)" % (path,))
|
|
self.tfs.unlink(path)
|
|
|
|
@logexc
|
|
def rmdir(self, path):
|
|
self.log("rmdir(%r)" % (path,))
|
|
self.tfs.unlink(path)
|
|
|
|
@logexc
|
|
def symlink(self, path, path1):
|
|
self.log("symlink(%r, %r)" % (path, path1))
|
|
self.tfs.link(path, path1)
|
|
|
|
@logexc
|
|
def rename(self, path, path1):
|
|
self.log("rename(%r, %r)" % (path, path1))
|
|
self.tfs.rename(path, path1)
|
|
|
|
@logexc
|
|
def link(self, path, path1):
|
|
self.log("link(%r, %r)" % (path, path1))
|
|
self.tfs.link(path, path1)
|
|
|
|
@logexc
|
|
def chmod(self, path, mode):
|
|
self.log("XX chmod(%r, %r)" % (path, mode))
|
|
#return -errno.EOPNOTSUPP
|
|
|
|
@logexc
|
|
def chown(self, path, user, group):
|
|
self.log("XX chown(%r, %r, %r)" % (path, user, group))
|
|
#return -errno.EOPNOTSUPP
|
|
|
|
@logexc
|
|
def truncate(self, path, len):
|
|
self.log("XX truncate(%r, %r)" % (path, len))
|
|
#return -errno.EOPNOTSUPP
|
|
|
|
@logexc
|
|
def utime(self, path, times):
|
|
self.log("XX utime(%r, %r)" % (path, times))
|
|
#return -errno.EOPNOTSUPP
|
|
|
|
@logexc
|
|
def statfs(self):
|
|
self.log("statfs()")
|
|
"""
|
|
Should return an object with statvfs attributes (f_bsize, f_frsize...).
|
|
Eg., the return value of os.statvfs() is such a thing (since py 2.2).
|
|
If you are not reusing an existing statvfs object, start with
|
|
fuse.StatVFS(), and define the attributes.
|
|
|
|
To provide usable information (ie., you want sensible df(1)
|
|
output, you are suggested to specify the following attributes:
|
|
|
|
- f_bsize - preferred size of file blocks, in bytes
|
|
- f_frsize - fundamental size of file blcoks, in bytes
|
|
[if you have no idea, use the same as blocksize]
|
|
- f_blocks - total number of blocks in the filesystem
|
|
- f_bfree - number of free blocks
|
|
- f_files - total number of file inodes
|
|
- f_ffree - nunber of free file inodes
|
|
"""
|
|
|
|
block_size = 4096 # 4k
|
|
preferred_block_size = 131072 # 128k, c.f. seg_size
|
|
fs_size = 8*2**40 # 8Tb
|
|
fs_free = 2*2**40 # 2Tb
|
|
|
|
#s = fuse.StatVfs(f_bsize = preferred_block_size,
|
|
s = dict(f_bsize = preferred_block_size,
|
|
f_frsize = block_size,
|
|
f_blocks = fs_size / block_size,
|
|
f_bfree = fs_free / block_size,
|
|
f_bavail = fs_free / block_size,
|
|
f_files = 2**30, # total files
|
|
f_ffree = 2**20, # available files
|
|
f_favail = 2**20, # available files (root)
|
|
f_flag = 2, # no suid
|
|
f_namemax = 255) # max name length
|
|
#self.log('statfs(): %r' % (s,))
|
|
return s
|
|
|
|
def fsinit(self):
|
|
self.log("fsinit()")
|
|
|
|
##################################################################
|
|
|
|
@logexc
|
|
def readdir(self, path, offset):
|
|
self.log('readdir(%r, %r)' % (path, offset))
|
|
node = self.tfs.get_path(path)
|
|
if node is None:
|
|
return -errno.ENOENT
|
|
dirlist = ['.', '..'] + node.children.keys()
|
|
self.log('dirlist = %r' % (dirlist,))
|
|
#return [fuse.Direntry(d) for d in dirlist]
|
|
return dirlist
|
|
|
|
@logexc
|
|
def getattr(self, path):
|
|
self.log('getattr(%r)' % (path,))
|
|
|
|
if path == '/':
|
|
# we don't have any metadata for the root (no edge leading to it)
|
|
mode = (stat.S_IFDIR | 755)
|
|
mtime = self.tfs.root.mtime
|
|
s = TStat({}, st_mode=mode, st_nlink=1, st_mtime=mtime)
|
|
self.log('getattr(%r) -> %r' % (path, s))
|
|
#return s
|
|
return stat_to_dict(s)
|
|
|
|
parent, name, child = self.tfs.get_parent_name_and_child(path)
|
|
if not child: # implicitly 'or not parent'
|
|
raise ENOENT('No such file or directory')
|
|
return stat_to_dict(parent.get_stat(name))
|
|
|
|
@logexc
|
|
def access(self, path, mode):
|
|
self.log("access(%r, %r)" % (path, mode))
|
|
node = self.tfs.get_path(path)
|
|
if not node:
|
|
return -errno.ENOENT
|
|
accmode = os.O_RDONLY | os.O_WRONLY | os.O_RDWR
|
|
if (mode & 0222):
|
|
if not node.writable():
|
|
log('write access denied for %s (req:%o)' % (path, mode, ))
|
|
return -errno.EACCES
|
|
#else:
|
|
#log('access granted for %s' % (path, ))
|
|
|
|
@logexc
|
|
def mkdir(self, path, mode):
|
|
self.log("mkdir(%r, %r)" % (path, mode))
|
|
self.tfs.mkdir(path)
|
|
|
|
##################################################################
|
|
# file methods
|
|
|
|
def open(self, path, flags):
|
|
self.log('open(%r, %r)' % (path, flags, ))
|
|
if path in self.files:
|
|
# XXX todo [ ] should consider concurrent open files of differing modes
|
|
return
|
|
else:
|
|
tffobj = TahoeFuseFile(self.tfs, path, flags)
|
|
self.files[path] = tffobj
|
|
|
|
def create(self, path, flags, mode):
|
|
self.log('create(%r, %r, %r)' % (path, flags, mode))
|
|
if path in self.files:
|
|
# XXX todo [ ] should consider concurrent open files of differing modes
|
|
return
|
|
else:
|
|
tffobj = TahoeFuseFile(self.tfs, path, flags, mode)
|
|
self.files[path] = tffobj
|
|
|
|
def _get_file(self, path):
|
|
if not path in self.files:
|
|
raise ENOENT('No such file or directory: %s' % (path,))
|
|
return self.files[path]
|
|
|
|
##
|
|
|
|
def read(self, path, size, offset):
|
|
self.log('read(%r, %r, %r)' % (path, size, offset, ))
|
|
return self._get_file(path).read(size, offset)
|
|
|
|
@logexc
|
|
def write(self, path, buf, offset):
|
|
self.log("write(%r, -%s-, %r)" % (path, len(buf), offset))
|
|
return self._get_file(path).write(buf, offset)
|
|
|
|
@logexc
|
|
def release(self, path, flags):
|
|
self.log("release(%r, %r)" % (path, flags,))
|
|
self._get_file(path).release(flags)
|
|
del self.files[path]
|
|
|
|
@logexc
|
|
def fsync(self, path, isfsyncfile):
|
|
self.log("fsync(%r, %r)" % (path, isfsyncfile,))
|
|
return self._get_file(path).fsync(isfsyncfile)
|
|
|
|
@logexc
|
|
def flush(self, path):
|
|
self.log("flush(%r)" % (path,))
|
|
return self._get_file(path).flush()
|
|
|
|
@logexc
|
|
def fgetattr(self, path):
|
|
self.log("fgetattr(%r)" % (path,))
|
|
return self._get_file(path).fgetattr()
|
|
|
|
@logexc
|
|
def ftruncate(self, path, len):
|
|
self.log("ftruncate(%r, %r)" % (path, len,))
|
|
return self._get_file(path).ftruncate(len)
|
|
|
|
class TahoeFuseLocal(TahoeFuseBase, fuse.Fuse):
|
|
def __init__(self, tfs, *args, **kw):
|
|
log("TFL: __init__(%r, %r)" % (args, kw))
|
|
TahoeFuseBase.__init__(self, tfs)
|
|
fuse.Fuse.__init__(self, *args, **kw)
|
|
|
|
def log(self, msg):
|
|
log("<TFL> %s" % (msg, ))
|
|
|
|
def main(self, *a, **kw):
|
|
self.log("main(%r, %r)" % (a, kw))
|
|
return fuse.Fuse.main(self, *a, **kw)
|
|
|
|
# overrides for those methods which return objects not marshalled
|
|
def fgetattr(self, path):
|
|
return TStat({}, **(TahoeFuseBase.fgetattr(self, path)))
|
|
|
|
def getattr(self, path):
|
|
return TStat({}, **(TahoeFuseBase.getattr(self, path)))
|
|
|
|
def statfs(self):
|
|
return fuse.StatVfs(**(TahoeFuseBase.statfs(self)))
|
|
#self.log('statfs()')
|
|
#ret = fuse.StatVfs(**(TahoeFuseBase.statfs(self)))
|
|
#self.log('statfs(): %r' % (ret,))
|
|
#return ret
|
|
|
|
@logexc
|
|
def readdir(self, path, offset):
|
|
return [ fuse.Direntry(d) for d in TahoeFuseBase.readdir(self, path, offset) ]
|
|
|
|
class TahoeFuseShim(fuse.Fuse):
|
|
def __init__(self, trpc, *args, **kw):
|
|
log("TF: __init__(%r, %r)" % (args, kw))
|
|
self.trpc = trpc
|
|
fuse.Fuse.__init__(self, *args, **kw)
|
|
|
|
def log(self, msg):
|
|
log("<TFs> %s" % (msg, ))
|
|
|
|
@logexc
|
|
def readlink(self, path):
|
|
self.log("readlink(%r)" % (path,))
|
|
return self.trpc.call('readlink', path)
|
|
|
|
@logexc
|
|
def unlink(self, path):
|
|
self.log("unlink(%r)" % (path,))
|
|
return self.trpc.call('unlink', path)
|
|
|
|
@logexc
|
|
def rmdir(self, path):
|
|
self.log("rmdir(%r)" % (path,))
|
|
return self.trpc.call('unlink', path)
|
|
|
|
@logexc
|
|
def symlink(self, path, path1):
|
|
self.log("symlink(%r, %r)" % (path, path1))
|
|
return self.trpc.call('link', path, path1)
|
|
|
|
@logexc
|
|
def rename(self, path, path1):
|
|
self.log("rename(%r, %r)" % (path, path1))
|
|
return self.trpc.call('rename', path, path1)
|
|
|
|
@logexc
|
|
def link(self, path, path1):
|
|
self.log("link(%r, %r)" % (path, path1))
|
|
return self.trpc.call('link', path, path1)
|
|
|
|
@logexc
|
|
def chmod(self, path, mode):
|
|
self.log("XX chmod(%r, %r)" % (path, mode))
|
|
return self.trpc.call('chmod', path, mode)
|
|
|
|
@logexc
|
|
def chown(self, path, user, group):
|
|
self.log("XX chown(%r, %r, %r)" % (path, user, group))
|
|
return self.trpc.call('chown', path, user, group)
|
|
|
|
@logexc
|
|
def truncate(self, path, len):
|
|
self.log("XX truncate(%r, %r)" % (path, len))
|
|
return self.trpc.call('truncate', path, len)
|
|
|
|
@logexc
|
|
def utime(self, path, times):
|
|
self.log("XX utime(%r, %r)" % (path, times))
|
|
return self.trpc.call('utime', path, times)
|
|
|
|
@logexc
|
|
def statfs(self):
|
|
self.log("statfs()")
|
|
response = self.trpc.call('statfs')
|
|
#self.log("statfs(): %r" % (response,))
|
|
kwargs = dict([ (str(k),v) for k,v in response.items() ])
|
|
return fuse.StatVfs(**kwargs)
|
|
|
|
def fsinit(self):
|
|
self.log("fsinit()")
|
|
|
|
def main(self, *a, **kw):
|
|
self.log("main(%r, %r)" % (a, kw))
|
|
|
|
return fuse.Fuse.main(self, *a, **kw)
|
|
|
|
##################################################################
|
|
|
|
@logexc
|
|
def readdir(self, path, offset):
|
|
self.log('readdir(%r, %r)' % (path, offset))
|
|
return [ fuse.Direntry(d) for d in self.trpc.call('readdir', path, offset) ]
|
|
|
|
@logexc
|
|
def getattr(self, path):
|
|
self.log('getattr(%r)' % (path,))
|
|
response = self.trpc.call('getattr', path)
|
|
kwargs = dict([ (str(k),v) for k,v in response.items() ])
|
|
s = TStat({}, **kwargs)
|
|
self.log('getattr(%r) -> %r' % (path, s))
|
|
return s
|
|
|
|
@logexc
|
|
def access(self, path, mode):
|
|
self.log("access(%r, %r)" % (path, mode))
|
|
return self.trpc.call('access', path, mode)
|
|
|
|
@logexc
|
|
def mkdir(self, path, mode):
|
|
self.log("mkdir(%r, %r)" % (path, mode))
|
|
return self.trpc.call('mkdir', path, mode)
|
|
|
|
##################################################################
|
|
# file methods
|
|
|
|
def open(self, path, flags):
|
|
self.log('open(%r, %r)' % (path, flags, ))
|
|
return self.trpc.call('open', path, flags)
|
|
|
|
def create(self, path, flags, mode):
|
|
self.log('create(%r, %r, %r)' % (path, flags, mode))
|
|
return self.trpc.call('create', path, flags, mode)
|
|
|
|
##
|
|
|
|
def read(self, path, size, offset):
|
|
self.log('read(%r, %r, %r)' % (path, size, offset, ))
|
|
return self.trpc.call('read', path, size, offset)
|
|
|
|
@logexc
|
|
def write(self, path, buf, offset):
|
|
self.log("write(%r, -%s-, %r)" % (path, len(buf), offset))
|
|
return self.trpc.call('write', path, buf, offset)
|
|
|
|
@logexc
|
|
def release(self, path, flags):
|
|
self.log("release(%r, %r)" % (path, flags,))
|
|
return self.trpc.call('release', path, flags)
|
|
|
|
@logexc
|
|
def fsync(self, path, isfsyncfile):
|
|
self.log("fsync(%r, %r)" % (path, isfsyncfile,))
|
|
return self.trpc.call('fsync', path, isfsyncfile)
|
|
|
|
@logexc
|
|
def flush(self, path):
|
|
self.log("flush(%r)" % (path,))
|
|
return self.trpc.call('flush', path)
|
|
|
|
@logexc
|
|
def fgetattr(self, path):
|
|
self.log("fgetattr(%r)" % (path,))
|
|
#return self.trpc.call('fgetattr', path)
|
|
response = self.trpc.call('fgetattr', path)
|
|
kwargs = dict([ (str(k),v) for k,v in response.items() ])
|
|
s = TStat({}, **kwargs)
|
|
self.log('getattr(%r) -> %r' % (path, s))
|
|
return s
|
|
|
|
@logexc
|
|
def ftruncate(self, path, len):
|
|
self.log("ftruncate(%r, %r)" % (path, len,))
|
|
return self.trpc.call('ftruncate', path, len)
|
|
|
|
|
|
def launch_tahoe_fuse(tf_class, tobj, argv):
|
|
sys.argv = ['tahoe fuse'] + list(argv)
|
|
log('setting sys.argv=%r' % (sys.argv,))
|
|
config = TahoeFuseOptions()
|
|
version = "%prog " +VERSIONSTR+", fuse "+ fuse.__version__
|
|
server = tf_class(tobj, version=version, usage=config.getSynopsis(), dash_s_do='setsingle')
|
|
server.parse(errex=1)
|
|
server.main()
|
|
|
|
def getnodeurl(nodedir):
|
|
f = file(os.path.expanduser(os.path.join(nodedir, "node.url")), 'rb')
|
|
nu = f.read().strip()
|
|
f.close()
|
|
if nu[-1] != "/":
|
|
nu += "/"
|
|
return nu
|
|
|
|
def fingerprint(uri):
|
|
if uri is None:
|
|
return None
|
|
return base64.b32encode(sha.new(uri).digest()).lower()[:6]
|
|
|
|
stat_fields = [ 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size',
|
|
'st_atime', 'st_mtime', 'st_ctime', ]
|
|
def stat_to_dict(statobj, fields=None):
|
|
if fields is None:
|
|
fields = stat_fields
|
|
d = {}
|
|
for f in fields:
|
|
d[f] = getattr(statobj, f, None)
|
|
return d
|
|
|
|
class TStat(fuse.Stat):
|
|
# in fuse 0.2, these are set by fuse.Stat.__init__
|
|
# in fuse 0.2-pre3 (hardy) they are not. badness ensues if they're missing
|
|
st_mode = None
|
|
st_ino = 0
|
|
st_dev = 0
|
|
st_nlink = None
|
|
st_uid = 0
|
|
st_gid = 0
|
|
st_size = 0
|
|
st_atime = 0
|
|
st_mtime = 0
|
|
st_ctime = 0
|
|
|
|
fields = [ 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size',
|
|
'st_atime', 'st_mtime', 'st_ctime', ]
|
|
def __init__(self, metadata, **kwargs):
|
|
# first load any stat fields present in 'metadata'
|
|
for st in [ 'mtime', 'ctime' ]:
|
|
if st in metadata:
|
|
setattr(self, "st_%s" % st, metadata[st])
|
|
for st in self.fields:
|
|
if st in metadata:
|
|
setattr(self, st, metadata[st])
|
|
|
|
# then set any values passed in as kwargs
|
|
fuse.Stat.__init__(self, **kwargs)
|
|
|
|
def __repr__(self):
|
|
return "<Stat%r>" % (stat_to_dict(self),)
|
|
|
|
class Directory(object):
|
|
def __init__(self, tfs, ro_uri, rw_uri):
|
|
self.tfs = tfs
|
|
self.ro_uri = ro_uri
|
|
self.rw_uri = rw_uri
|
|
assert (rw_uri or ro_uri)
|
|
self.children = {}
|
|
self.last_load = None
|
|
self.last_data = None
|
|
self.mtime = 0
|
|
|
|
def __repr__(self):
|
|
return "<Directory %s>" % (fingerprint(self.get_uri()),)
|
|
|
|
def maybe_refresh(self, name=None):
|
|
"""
|
|
if the previously cached data was retrieved within the cache
|
|
validity period, does nothing. otherwise refetches the data
|
|
for this directory and reloads itself
|
|
"""
|
|
now = time.time()
|
|
if self.last_load is None or (now - self.last_load) > self.tfs.cache_validity:
|
|
self.load(name)
|
|
|
|
def load(self, name=None):
|
|
now = time.time()
|
|
log('%s.loading(%s)' % (self, name))
|
|
url = self.tfs.compose_url("uri/%s?t=json", self.get_uri())
|
|
data = urllib.urlopen(url).read()
|
|
h = tagged_hash('cache_hash', data)
|
|
if h == self.last_data:
|
|
self.last_load = now
|
|
log('%s.load() : no change h(data)=%s' % (self, base32.b2a(h), ))
|
|
return
|
|
try:
|
|
parsed = simplejson.loads(data)
|
|
except ValueError:
|
|
log('%s.load(): unable to parse json data for dir:\n%r' % (self, data))
|
|
return
|
|
nodetype, d = parsed
|
|
assert nodetype == 'dirnode'
|
|
self.children.clear()
|
|
for cname,details in d['children'].items():
|
|
cname = unicode_to_utf8_or_str(cname)
|
|
ctype, cattrs = details
|
|
metadata = cattrs.get('metadata', {})
|
|
if ctype == 'dirnode':
|
|
cobj = self.tfs.dir_for(cname, cattrs.get('ro_uri'), cattrs.get('rw_uri'))
|
|
else:
|
|
assert ctype == "filenode"
|
|
cobj = File(cattrs.get('size'), cattrs.get('ro_uri'))
|
|
self.children[cname] = cobj, metadata
|
|
self.last_load = now
|
|
self.last_data = h
|
|
self.mtime = now
|
|
log('%s.load() loaded: \n%s' % (self, self.pprint(),))
|
|
|
|
def get_children(self):
|
|
return self.children.keys()
|
|
|
|
def get_child(self, name):
|
|
return self.children[name][0]
|
|
|
|
def add_child(self, name, child, metadata):
|
|
log('%s.add_child(%r, %r, %r)' % (self, name, child, metadata, ))
|
|
self.children[name] = child, metadata
|
|
url = self.tfs.compose_url("uri/%s/%s?t=uri", self.get_uri(), name)
|
|
child_cap = do_http('PUT', url, child.get_uri())
|
|
# XXX [ ] TODO: push metadata to tahoe node
|
|
assert child_cap == child.get_uri()
|
|
self.mtime = time.time()
|
|
log('added child %r with %r to %r' % (name, child_cap, self))
|
|
|
|
def remove_child(self, name):
|
|
log('%s.remove_child(%r)' % (self, name, ))
|
|
del self.children[name]
|
|
url = self.tfs.compose_url("uri/%s/%s", self.get_uri(), name)
|
|
resp = do_http('DELETE', url)
|
|
self.mtime = time.time()
|
|
log('child (%s) removal yielded %r' % (name, resp,))
|
|
|
|
def get_uri(self):
|
|
return self.rw_uri or self.ro_uri
|
|
|
|
# TODO: rename to 'is_writeable', or switch sense to 'is_readonly', for consistency with Tahoe code
|
|
def writable(self):
|
|
return self.rw_uri and self.rw_uri != self.ro_uri
|
|
|
|
def pprint(self, prefix='', printed=None, suffix=''):
|
|
ret = []
|
|
if printed is None:
|
|
printed = set()
|
|
writable = self.writable() and '+' or ' '
|
|
if self in printed:
|
|
ret.append(" %s/%s ... <%s> : %s" % (prefix, writable, fingerprint(self.get_uri()), suffix, ))
|
|
else:
|
|
ret.append("[%s] %s/%s : %s" % (fingerprint(self.get_uri()), prefix, writable, suffix, ))
|
|
printed.add(self)
|
|
for name,(child,metadata) in sorted(self.children.items()):
|
|
ret.append(child.pprint(' ' * (len(prefix)+1)+name, printed, repr(metadata)))
|
|
return '\n'.join(ret)
|
|
|
|
def get_metadata(self, name):
|
|
return self.children[name][1]
|
|
|
|
def get_stat(self, name):
|
|
child,metadata = self.children[name]
|
|
log("%s.get_stat(%s) md: %r" % (self, name, metadata))
|
|
|
|
if isinstance(child, Directory):
|
|
child.maybe_refresh(name)
|
|
mode = metadata.get('st_mode') or (stat.S_IFDIR | 0755)
|
|
s = TStat(metadata, st_mode=mode, st_nlink=1, st_mtime=child.mtime)
|
|
else:
|
|
if hasattr(child, 'tmp_fname'):
|
|
s = os.stat(child.tmp_fname)
|
|
log("%s.get_stat(%s) returning local stat of tmp file" % (self, name, ))
|
|
else:
|
|
s = TStat(metadata,
|
|
st_nlink = 1,
|
|
st_size = child.size,
|
|
st_mode = metadata.get('st_mode') or (stat.S_IFREG | 0444),
|
|
st_mtime = metadata.get('mtime') or self.mtime,
|
|
)
|
|
return s
|
|
|
|
log("%s.get_stat(%s)->%s" % (self, name, s))
|
|
return s
|
|
|
|
class File(object):
|
|
def __init__(self, size, ro_uri):
|
|
self.size = size
|
|
if ro_uri:
|
|
ro_uri = str(ro_uri)
|
|
self.ro_uri = ro_uri
|
|
|
|
def __repr__(self):
|
|
return "<File %s>" % (fingerprint(self.ro_uri) or [self.tmp_fname],)
|
|
|
|
def pprint(self, prefix='', printed=None, suffix=''):
|
|
return " %s (%s) : %s" % (prefix, self.size, suffix, )
|
|
|
|
def get_uri(self):
|
|
return self.ro_uri
|
|
|
|
def writable(self):
|
|
return True
|
|
|
|
class TFS(object):
|
|
def __init__(self, nodedir, nodeurl, root_uri,
|
|
cache_validity_period=DEFAULT_DIRECTORY_VALIDITY, async=False):
|
|
self.cache_validity = cache_validity_period
|
|
self.nodeurl = nodeurl
|
|
self.root_uri = root_uri
|
|
self.async = async
|
|
self.dirs = {}
|
|
|
|
cachedir = os.path.expanduser(os.path.join(nodedir, '_cache'))
|
|
self.cache = FileCache(nodeurl, cachedir)
|
|
ro_uri = DirectoryURI.init_from_string(self.root_uri).get_readonly()
|
|
self.root = Directory(self, ro_uri, self.root_uri)
|
|
self.root.maybe_refresh('<root>')
|
|
|
|
def log(self, msg):
|
|
log("<TFS> %s" % (msg, ))
|
|
|
|
def pprint(self):
|
|
return self.root.pprint()
|
|
|
|
def compose_url(self, fmt, *args):
|
|
return self.nodeurl + (fmt % tuple(map(urllib.quote, args)))
|
|
|
|
def get_parent_name_and_child(self, path):
|
|
"""
|
|
find the parent dir node, name of child relative to that parent, and
|
|
child node within the TFS object space.
|
|
@returns: (parent, name, child) if the child is found
|
|
(parent, name, None) if the child is missing from the parent
|
|
(None, name, None) if the parent is not found
|
|
"""
|
|
if path == '/':
|
|
return
|
|
dirname, name = os.path.split(path)
|
|
parent = self.get_path(dirname)
|
|
if parent:
|
|
try:
|
|
child = parent.get_child(name)
|
|
return parent, name, child
|
|
except KeyError:
|
|
return parent, name, None
|
|
else:
|
|
return None, name, None
|
|
|
|
def get_path(self, path):
|
|
comps = path.strip('/').split('/')
|
|
if comps == ['']:
|
|
comps = []
|
|
cursor = self.root
|
|
c_name = '<root>'
|
|
for comp in comps:
|
|
if not isinstance(cursor, Directory):
|
|
self.log('path "%s" is not a dir' % (path,))
|
|
return None
|
|
cursor.maybe_refresh(c_name)
|
|
try:
|
|
cursor = cursor.get_child(comp)
|
|
c_name = comp
|
|
except KeyError:
|
|
self.log('path "%s" not found' % (path,))
|
|
return None
|
|
if isinstance(cursor, Directory):
|
|
cursor.maybe_refresh(c_name)
|
|
return cursor
|
|
|
|
def dir_for(self, name, ro_uri, rw_uri):
|
|
#self.log('dir_for(%s) [%s/%s]' % (name, fingerprint(ro_uri), fingerprint(rw_uri)))
|
|
if ro_uri:
|
|
ro_uri = str(ro_uri)
|
|
if rw_uri:
|
|
rw_uri = str(rw_uri)
|
|
uri = rw_uri or ro_uri
|
|
assert uri
|
|
dirobj = self.dirs.get(uri)
|
|
if not dirobj:
|
|
self.log('dir_for(%s) creating new Directory' % (name, ))
|
|
dirobj = Directory(self, ro_uri, rw_uri)
|
|
self.dirs[uri] = dirobj
|
|
return dirobj
|
|
|
|
def upload(self, fname):
|
|
self.log('upload(%r)' % (fname,))
|
|
fh = file(fname, 'rb')
|
|
url = self.compose_url("uri")
|
|
file_cap = do_http('PUT', url, fh)
|
|
self.log('uploaded to: %r' % (file_cap,))
|
|
return file_cap
|
|
|
|
def mkdir(self, path):
|
|
self.log('mkdir(%r)' % (path,))
|
|
parent, name, child = self.get_parent_name_and_child(path)
|
|
|
|
if child:
|
|
raise EEXIST('File exists: %s' % (name,))
|
|
if not parent:
|
|
raise ENOENT('No such file or directory: %s' % (path,))
|
|
|
|
url = self.compose_url("uri?t=mkdir")
|
|
new_dir_cap = do_http('PUT', url)
|
|
|
|
ro_uri = DirectoryURI.init_from_string(new_dir_cap).get_readonly()
|
|
child = Directory(self, ro_uri, new_dir_cap)
|
|
parent.add_child(name, child, {})
|
|
|
|
def rename(self, path, path1):
|
|
self.log('rename(%s, %s)' % (path, path1))
|
|
src_parent, src_name, src_child = self.get_parent_name_and_child(path)
|
|
dst_parent, dst_name, dst_child = self.get_parent_name_and_child(path1)
|
|
|
|
if not src_child or not dst_parent:
|
|
raise ENOENT('No such file or directory')
|
|
|
|
dst_parent.add_child(dst_name, src_child, {})
|
|
src_parent.remove_child(src_name)
|
|
|
|
def unlink(self, path):
|
|
parent, name, child = self.get_parent_name_and_child(path)
|
|
|
|
if child is None: # parent or child is missing
|
|
raise ENOENT('No such file or directory')
|
|
if not parent.writable():
|
|
raise EACCESS('Permission denied')
|
|
|
|
parent.remove_child(name)
|
|
|
|
def link(self, path, path1):
|
|
src = self.get_path(path)
|
|
dst_parent, dst_name, dst_child = self.get_parent_name_and_child(path1)
|
|
|
|
if not src:
|
|
raise ENOENT('No such file or directory')
|
|
if dst_parent is None:
|
|
raise ENOENT('No such file or directory')
|
|
if not dst_parent.writable():
|
|
raise EACCESS('Permission denied')
|
|
|
|
dst_parent.add_child(dst_name, src, {})
|
|
|
|
class FileCache(object):
|
|
def __init__(self, nodeurl, cachedir):
|
|
self.nodeurl = nodeurl
|
|
self.cachedir = cachedir
|
|
if not os.path.exists(self.cachedir):
|
|
os.makedirs(self.cachedir)
|
|
self.tmpdir = os.path.join(self.cachedir, 'tmp')
|
|
if not os.path.exists(self.tmpdir):
|
|
os.makedirs(self.tmpdir)
|
|
self.downloaders = weakref.WeakValueDictionary()
|
|
|
|
def log(self, msg):
|
|
log("<FC> %s" % (msg, ))
|
|
|
|
def get_file(self, uri):
|
|
self.log('get_file(%s)' % (uri,))
|
|
if is_literal_file_uri(uri):
|
|
return self.get_literal(uri)
|
|
else:
|
|
return self.get_chk(uri, async=False)
|
|
|
|
def async_get_file(self, uri):
|
|
self.log('get_file(%s)' % (uri,))
|
|
return self.get_chk(uri, async=True)
|
|
|
|
def get_literal(self, uri):
|
|
h = sha.new(uri).digest()
|
|
u = LiteralFileURI.init_from_string(uri)
|
|
fname = os.path.join(self.cachedir, '__'+base64.b32encode(h).lower())
|
|
size = len(u.data)
|
|
self.log('writing literal file %s (%s)' % (fname, size, ))
|
|
fh = open(fname, 'wb')
|
|
fh.write(u.data)
|
|
fh.close()
|
|
return fname
|
|
|
|
def get_chk(self, uri, async=False):
|
|
u = CHKFileURI.init_from_string(str(uri))
|
|
storage_index = u.storage_index
|
|
size = u.size
|
|
fname = os.path.join(self.cachedir, base64.b32encode(storage_index).lower())
|
|
if os.path.exists(fname):
|
|
fsize = os.path.getsize(fname)
|
|
if fsize == size:
|
|
if async:
|
|
return fname, None
|
|
else:
|
|
return fname
|
|
else:
|
|
self.log('warning file "%s" is too short %s < %s' % (fname, fsize, size))
|
|
self.log('downloading file %s (%s)' % (fname, size, ))
|
|
url = "%suri/%s" % (self.nodeurl, uri)
|
|
if async:
|
|
if fname in self.downloaders and self.downloaders[fname].running:
|
|
downloader = self.downloaders[fname]
|
|
else:
|
|
downloader = DownloaderWithReadQueue()
|
|
self.downloaders[fname] = downloader
|
|
d = downloader.start(url, fname, target_size=u.size)
|
|
def clear_downloader(result, fname):
|
|
self.log('clearing %s from downloaders: %r' % (fname, result))
|
|
self.downloaders.pop(fname, None)
|
|
d.addBoth(clear_downloader, fname)
|
|
return fname, downloader
|
|
else:
|
|
fh = open(fname, 'wb')
|
|
download = urllib.urlopen(url)
|
|
while True:
|
|
chunk = download.read(4096)
|
|
if not chunk:
|
|
break
|
|
fh.write(chunk)
|
|
fh.close()
|
|
return fname
|
|
|
|
def tmp_file(self, id):
|
|
fname = os.path.join(self.tmpdir, base64.b32encode(id).lower())
|
|
return fname
|
|
|
|
_tfs = None # to appease pyflakes; is set in main()
|
|
def print_tree():
|
|
log('tree:\n' + _tfs.pprint())
|
|
|
|
|
|
def unmarshal(obj):
|
|
if obj is None or isinstance(obj, int) or isinstance(obj, long) or isinstance(obj, float):
|
|
return obj
|
|
elif isinstance(obj, unicode) or isinstance(obj, str):
|
|
#log('unmarshal(%r)' % (obj,))
|
|
return base64.b64decode(obj)
|
|
elif isinstance(obj, list):
|
|
return map(unmarshal, obj)
|
|
elif isinstance(obj, dict):
|
|
return dict([ (k,unmarshal(v)) for k,v in obj.items() ])
|
|
else:
|
|
raise ValueError('object type not int,str,list,dict,none (%s) (%r)' % (type(obj), obj))
|
|
|
|
def marshal(obj):
|
|
if obj is None or isinstance(obj, int) or isinstance(obj, long) or isinstance(obj, float):
|
|
return obj
|
|
elif isinstance(obj, str):
|
|
return base64.b64encode(obj)
|
|
elif isinstance(obj, list) or isinstance(obj, tuple):
|
|
return map(marshal, obj)
|
|
elif isinstance(obj, dict):
|
|
return dict([ (k,marshal(v)) for k,v in obj.items() ])
|
|
else:
|
|
raise ValueError('object type not int,str,list,dict,none (%s)' % type(obj))
|
|
|
|
|
|
class TRPCProtocol(Protocol):
|
|
compute_response_sha1 = True
|
|
log_all_requests = False
|
|
|
|
def connectionMade(self):
|
|
self.buf = []
|
|
|
|
def dataReceived(self, data):
|
|
if data == 'keepalive\n':
|
|
log('keepalive connection on %r' % (self.transport,))
|
|
self.keepalive = True
|
|
return
|
|
|
|
if not data.endswith('\n'):
|
|
self.buf.append(data)
|
|
return
|
|
if self.buf:
|
|
self.buf.append(data)
|
|
reqstr = ''.join(self.buf)
|
|
self.buf = []
|
|
self.dispatch_request(reqstr)
|
|
else:
|
|
self.dispatch_request(data)
|
|
|
|
def dispatch_request(self, reqstr):
|
|
try:
|
|
req = simplejson.loads(reqstr)
|
|
except ValueError, ve:
|
|
log(ve)
|
|
return
|
|
|
|
d = defer.maybeDeferred(self.handle_request, req)
|
|
d.addCallback(self.send_response)
|
|
d.addErrback(self.send_error)
|
|
|
|
def send_error(self, failure):
|
|
log('failure: %s' % (failure,))
|
|
if failure.check(TFSIOError):
|
|
e = failure.value
|
|
self.send_response(['error', 'errno', e.args[0], e.args[1]])
|
|
else:
|
|
self.send_response(['error', 'failure', str(failure)])
|
|
|
|
def send_response(self, result):
|
|
response = simplejson.dumps(result)
|
|
header = { 'len': len(response), }
|
|
if self.compute_response_sha1:
|
|
header['sha1'] = base64.b64encode(sha.new(response).digest())
|
|
hdr = simplejson.dumps(header)
|
|
self.transport.write(hdr)
|
|
self.transport.write('\n')
|
|
self.transport.write(response)
|
|
self.transport.loseConnection()
|
|
|
|
def connectionLost(self, reason):
|
|
if hasattr(self, 'keepalive'):
|
|
log('keepalive connection %r lost, shutting down' % (self.transport,))
|
|
reactor.callLater(0, reactor.stop)
|
|
|
|
def handle_request(self, req):
|
|
if type(req) is not list or not req or len(req) < 1:
|
|
return ['error', 'malformed request']
|
|
if req[0] == 'call':
|
|
if len(req) < 3:
|
|
return ['error', 'malformed request']
|
|
methname = req[1]
|
|
try:
|
|
args = unmarshal(req[2])
|
|
except ValueError, ve:
|
|
return ['error', 'malformed arguments', str(ve)]
|
|
|
|
try:
|
|
meth = getattr(self.factory.server, methname)
|
|
except AttributeError, ae:
|
|
return ['error', 'no such method', str(ae)]
|
|
|
|
if self.log_all_requests:
|
|
log('call %s(%s)' % (methname, ', '.join(map(repr, args))))
|
|
try:
|
|
result = meth(*args)
|
|
except TFSIOError, e:
|
|
log('errno: %s; %s' % e.args)
|
|
return ['error', 'errno', e.args[0], e.args[1]]
|
|
except Exception, e:
|
|
log('exception: ' + traceback.format_exc())
|
|
return ['error', 'exception', str(e)]
|
|
d = defer.succeed(None)
|
|
d.addCallback(lambda junk: result) # result may be Deferred
|
|
d.addCallback(lambda res: ['result', marshal(res)]) # only applies if not errback
|
|
return d
|
|
|
|
class TFSServer(object):
|
|
def __init__(self, socket_path, server=None):
|
|
self.socket_path = socket_path
|
|
log('TFSServer init socket: %s' % (socket_path,))
|
|
|
|
self.factory = Factory()
|
|
self.factory.protocol = TRPCProtocol
|
|
if server:
|
|
self.factory.server = server
|
|
else:
|
|
self.factory.server = self
|
|
|
|
def get_service(self):
|
|
if not hasattr(self, 'svc'):
|
|
from twisted.application import strports
|
|
self.svc = strports.service('unix:'+self.socket_path, self.factory)
|
|
return self.svc
|
|
|
|
def run(self):
|
|
svc = self.get_service()
|
|
def ss():
|
|
try:
|
|
svc.startService()
|
|
except:
|
|
reactor.callLater(0, reactor.stop)
|
|
raise
|
|
reactor.callLater(0, ss)
|
|
reactor.run()
|
|
|
|
def hello(self):
|
|
return 'pleased to meet you'
|
|
|
|
def echo(self, arg):
|
|
return arg
|
|
|
|
def failex(self):
|
|
raise ValueError('expected')
|
|
|
|
def fail(self):
|
|
return defer.maybeDeferred(self.failex)
|
|
|
|
class RPCError(RuntimeError):
|
|
pass
|
|
|
|
class TRPC(object):
|
|
def __init__(self, socket_fname):
|
|
self.socket_fname = socket_fname
|
|
self.keepalive = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
self.keepalive.connect(self.socket_fname)
|
|
self.keepalive.send('keepalive\n')
|
|
log('requested keepalive on %s' % (self.keepalive,))
|
|
|
|
def req(self, req):
|
|
# open conenction to trpc server
|
|
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
s.connect(self.socket_fname)
|
|
# send request
|
|
s.send(simplejson.dumps(req))
|
|
s.send('\n')
|
|
# read response header
|
|
hdr_data = s.recv(8192)
|
|
first_newline = hdr_data.index('\n')
|
|
header = hdr_data[:first_newline]
|
|
data = hdr_data[first_newline+1:]
|
|
hdr = simplejson.loads(header)
|
|
hdr_len = hdr['len']
|
|
if hdr.has_key('sha1'):
|
|
hdr_sha1 = base64.b64decode(hdr['sha1'])
|
|
spool = [data]
|
|
spool_sha = sha.new(data)
|
|
# spool response
|
|
while True:
|
|
data = s.recv(8192)
|
|
if data:
|
|
spool.append(data)
|
|
spool_sha.update(data)
|
|
else:
|
|
break
|
|
else:
|
|
spool = [data]
|
|
# spool response
|
|
while True:
|
|
data = s.recv(8192)
|
|
if data:
|
|
spool.append(data)
|
|
else:
|
|
break
|
|
s.close()
|
|
# decode response
|
|
resp = ''.join(spool)
|
|
spool = None
|
|
assert hdr_len == len(resp), str((hdr_len, len(resp), repr(resp)))
|
|
if hdr.has_key('sha1'):
|
|
data_sha1 = spool_sha.digest()
|
|
spool = spool_sha = None
|
|
assert hdr_sha1 == data_sha1, str((base32.b2a(hdr_sha1), base32.b2a(data_sha1)))
|
|
#else:
|
|
#print 'warning, server provided no sha1 to check'
|
|
return resp
|
|
|
|
def call(self, methodname, *args):
|
|
res = self.req(['call', methodname, marshal(args)])
|
|
|
|
result = simplejson.loads(res)
|
|
if not result or len(result) < 2:
|
|
raise TypeError('malformed response %r' % (result,))
|
|
if result[0] == 'error':
|
|
if result[1] == 'errno':
|
|
raise TFSIOError(result[2], result[3])
|
|
else:
|
|
raise RPCError(*(result[1:])) # error, exception / error, failure
|
|
elif result[0] == 'result':
|
|
return unmarshal(result[1])
|
|
else:
|
|
raise TypeError('unknown response type %r' % (result[0],))
|
|
|
|
def shutdown(self):
|
|
log('shutdown() closing keepalive %s' % (self.keepalive,))
|
|
self.keepalive.close()
|
|
|
|
# (cut-n-pasted here due to an ImportError / some py2app linkage issues)
|
|
#from twisted.scripts._twistd_unix import daemonize
|
|
def daemonize():
|
|
# See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
|
|
if os.fork(): # launch child and...
|
|
os._exit(0) # kill off parent
|
|
os.setsid()
|
|
if os.fork(): # launch child and...
|
|
os._exit(0) # kill off parent again.
|
|
os.umask(077)
|
|
null=os.open('/dev/null', os.O_RDWR)
|
|
for i in range(3):
|
|
try:
|
|
os.dup2(null, i)
|
|
except OSError, e:
|
|
if e.errno != errno.EBADF:
|
|
raise
|
|
os.close(null)
|
|
|
|
def main(argv):
|
|
log("main(%s)" % (argv,))
|
|
|
|
# check for version or help options (no args == help)
|
|
if not argv:
|
|
argv = ['--help']
|
|
if len(argv) == 1 and argv[0] in ['-h', '--help']:
|
|
config = TahoeFuseOptions()
|
|
print >> sys.stderr, config
|
|
print >> sys.stderr, 'fuse usage follows:'
|
|
if len(argv) == 1 and argv[0] in ['-h', '--help', '--version']:
|
|
launch_tahoe_fuse(TahoeFuseLocal, None, argv)
|
|
return -2
|
|
|
|
# parse command line options
|
|
config = TahoeFuseOptions()
|
|
try:
|
|
#print 'parsing', argv
|
|
config.parseOptions(argv)
|
|
except usage.error, e:
|
|
print config
|
|
print e
|
|
return -1
|
|
|
|
# check for which alias or uri is specified
|
|
if config['alias']:
|
|
alias = config['alias']
|
|
#print 'looking for aliases in', config['node-directory']
|
|
aliases = get_aliases(os.path.expanduser(config['node-directory']))
|
|
if alias not in aliases:
|
|
raise usage.error('Alias %r not found' % (alias,))
|
|
root_uri = aliases[alias]
|
|
root_name = alias
|
|
elif config['root-uri']:
|
|
root_uri = config['root-uri']
|
|
root_name = 'uri_' + base32.b2a(tagged_hash('root_name', root_uri))[:12]
|
|
# test the uri for structural validity:
|
|
try:
|
|
DirectoryURI.init_from_string(root_uri)
|
|
except:
|
|
raise usage.error('root-uri must be a valid directory uri (not %r)' % (root_uri,))
|
|
else:
|
|
raise usage.error('At least one of --alias or --root-uri must be specified')
|
|
|
|
nodedir = config['node-directory']
|
|
nodeurl = config['node-url']
|
|
if not nodeurl:
|
|
nodeurl = getnodeurl(nodedir)
|
|
|
|
# allocate socket
|
|
socket_dir = os.path.join(os.path.expanduser(nodedir), "tfuse.sockets")
|
|
socket_path = os.path.join(socket_dir, root_name)
|
|
if len(socket_path) > 103:
|
|
# try googling AF_UNIX and sun_len for some taste of why this oddity exists.
|
|
raise OSError(errno.ENAMETOOLONG, 'socket path too long (%s)' % (socket_path,))
|
|
|
|
fileutil.make_dirs(socket_dir, 0700)
|
|
if os.path.exists(socket_path):
|
|
log('socket exists')
|
|
if config['server-shutdown']:
|
|
log('calling shutdown')
|
|
trpc = TRPC(socket_path)
|
|
result = trpc.shutdown()
|
|
log('result: %r' % (result,))
|
|
log('called shutdown')
|
|
return
|
|
else:
|
|
raise OSError(errno.EEXIST, 'fuse already running (%r exists)' % (socket_path,))
|
|
elif config['server-shutdown']:
|
|
raise OSError(errno.ENOTCONN, '--server-shutdown specified, but server not running')
|
|
|
|
if not os.path.exists(config.mountpoint):
|
|
raise OSError(errno.ENOENT, 'No such file or directory: "%s"' % (config.mountpoint,))
|
|
|
|
global _tfs
|
|
#
|
|
# Standalone ("no-split")
|
|
#
|
|
if config['no-split']:
|
|
reopen_logfile('tfuse.%s.unsplit.log' % (root_name,))
|
|
log('\n'+(24*'_')+'init (unsplit)'+(24*'_')+'\n')
|
|
|
|
cache_timeout = float(config['cache-timeout'])
|
|
tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=False)
|
|
#print tfs.pprint()
|
|
|
|
# make tfs instance accesible to print_tree() for dbg
|
|
_tfs = tfs
|
|
|
|
args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
|
|
launch_tahoe_fuse(TahoeFuseLocal, tfs, args)
|
|
|
|
#
|
|
# Server
|
|
#
|
|
elif config['server']:
|
|
reopen_logfile('tfuse.%s.server.log' % (root_name,))
|
|
log('\n'+(24*'_')+'init (server)'+(24*'_')+'\n')
|
|
|
|
log('daemonizing')
|
|
daemonize()
|
|
|
|
try:
|
|
cache_timeout = float(config['cache-timeout'])
|
|
tfs = TFS(nodedir, nodeurl, root_uri, cache_timeout, async=True)
|
|
#print tfs.pprint()
|
|
|
|
# make tfs instance accesible to print_tree() for dbg
|
|
_tfs = tfs
|
|
|
|
log('launching tfs server')
|
|
tfuse = TahoeFuseBase(tfs)
|
|
tfs_server = TFSServer(socket_path, tfuse)
|
|
tfs_server.run()
|
|
log('tfs server ran, exiting')
|
|
except:
|
|
log('exception: ' + traceback.format_exc())
|
|
|
|
#
|
|
# Client
|
|
#
|
|
else:
|
|
reopen_logfile('tfuse.%s.client.log' % (root_name,))
|
|
log('\n'+(24*'_')+'init (client)'+(24*'_')+'\n')
|
|
|
|
server_args = [sys.executable, sys.argv[0], '--server'] + argv
|
|
if 'Allmydata.app/Contents/MacOS' in sys.executable:
|
|
# in this case blackmatch is the 'fuse' subcommand of the 'tahoe' executable
|
|
# otherwise we assume blackmatch is being run from source
|
|
server_args.insert(2, 'fuse')
|
|
#print 'launching server:', server_args
|
|
server = subprocess.Popen(server_args)
|
|
waiting_since = time.time()
|
|
wait_at_most = 8
|
|
while not os.path.exists(socket_path):
|
|
log('waiting for appearance of %r' % (socket_path,))
|
|
time.sleep(1)
|
|
if time.time() - waiting_since > wait_at_most:
|
|
log('%r did not appear within %ss' % (socket_path, wait_at_most))
|
|
raise IOError(2, 'no socket %s' % (socket_path,))
|
|
#print 'launched server'
|
|
trpc = TRPC(socket_path)
|
|
|
|
|
|
args = [ '-o'+opt for opt in config.fuse_options ] + [config.mountpoint]
|
|
launch_tahoe_fuse(TahoeFuseShim, trpc, args)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main(sys.argv[1:]))
|