initial file-table support, VirtualDrive service, rearrange Storage somewhat

This commit is contained in:
Brian Warner 2006-12-03 19:07:41 -07:00
parent 6e09b5ddb9
commit dd212c09e1
11 changed files with 366 additions and 46 deletions

@ -9,6 +9,8 @@ run-client:
run-client2:
cd client-basedir2 && PYTHONPATH=.. twistd -noy ../client.tac
run-client3:
cd client-basedir3 && PYTHONPATH=.. twistd -noy ../client.tac
test:
trial allmydata

@ -27,13 +27,14 @@ class BucketStore(service.MultiService, Referenceable):
def has_bucket(self, verifierid):
return os.path.exists(self._get_bucket_dir(verifierid))
def allocate_bucket(self, verifierid, bucket_num, size, leaser_credentials):
def allocate_bucket(self, verifierid, bucket_num, size,
leaser_credentials, canary):
bucket_dir = self._get_bucket_dir(verifierid)
precondition(not os.path.exists(bucket_dir))
precondition(isinstance(bucket_num, int))
bucket = WriteBucket(bucket_dir, verifierid, bucket_num, size)
bucket.set_leaser(leaser_credentials)
lease = Lease(verifierid, leaser_credentials, bucket)
lease = Lease(verifierid, leaser_credentials, bucket, canary)
self._leases.add(lease)
return lease
@ -42,18 +43,18 @@ class BucketStore(service.MultiService, Referenceable):
bucket_dir = self._get_bucket_dir(verifierid)
if os.path.exists(bucket_dir):
b = ReadBucket(bucket_dir, verifierid)
br = BucketReader(b)
return [(b.get_bucket_num(), br)]
return [(b.get_bucket_num(), b)]
else:
return []
class Lease(Referenceable):
implements(RIBucketWriter)
def __init__(self, verifierid, leaser, bucket):
def __init__(self, verifierid, leaser, bucket, canary):
self._leaser = leaser
self._verifierid = verifierid
self._bucket = bucket
canary.notifyOnDisconnect(self._lost_canary)
def get_bucket(self):
return self._bucket
@ -64,17 +65,8 @@ class Lease(Referenceable):
def remote_close(self):
self._bucket.close()
class BucketReader(Referenceable):
implements(RIBucketReader)
def __init__(self, bucket):
self._bucket = bucket
def remote_get_bucket_num(self):
return self._bucket.get_bucket_num()
def remote_read(self):
return self._bucket.read()
def _lost_canary(self):
pass
class Bucket:
def __init__(self, bucket_dir, verifierid):
@ -127,14 +119,17 @@ class WriteBucket(Bucket):
_assert(os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size)
return complete
class ReadBucket(Bucket):
class ReadBucket(Bucket, Referenceable):
implements(RIBucketReader)
def __init__(self, bucket_dir, verifierid):
Bucket.__init__(self, bucket_dir, verifierid)
precondition(self.is_complete()) # implicitly asserts bucket_dir exists
def get_bucket_num(self):
return int(self._read_attr('bucket_num'))
remote_get_bucket_num = get_bucket_num
def read(self):
return self._read_attr('data')
remote_read = read

@ -9,10 +9,11 @@ from allmydata import node
from twisted.internet import defer
from allmydata.util import idlib
from allmydata.storageserver import StorageServer
from allmydata.upload import Uploader
from allmydata.download import Downloader
from allmydata.util import idlib
from allmydata.vdrive import VDrive
class Client(node.Node, Referenceable):
implements(RIClient)
@ -29,6 +30,7 @@ class Client(node.Node, Referenceable):
self.add_service(StorageServer(os.path.join(basedir, self.STOREDIR)))
self.add_service(Uploader())
self.add_service(Downloader())
self.add_service(VDrive())
self.queen_pburl = None
self.queen_connector = None
@ -63,14 +65,21 @@ class Client(node.Node, Referenceable):
self.log("connected to queen")
self.queen = queen
queen.notifyOnDisconnect(self._lost_queen)
queen.callRemote("hello",
nodeid=self.nodeid, node=self, pburl=self.my_pburl)
d = queen.callRemote("hello",
nodeid=self.nodeid,
node=self,
pburl=self.my_pburl)
d.addCallback(self._got_vdrive_root)
def _got_vdrive_root(self, root):
self.getServiceNamed("vdrive").set_root(root)
def _lost_queen(self):
self.log("lost connection to queen")
self.queen = None
def remote_get_service(self, name):
# TODO: 'vdrive' should not be public in the medium term
return self.getServiceNamed(name)
def remote_add_peers(self, new_peers):

98
allmydata/filetable.py Normal file

@ -0,0 +1,98 @@
import os, shutil
from zope.interface import implements
from foolscap import Referenceable
from allmydata.interfaces import RIMutableDirectoryNode
from twisted.application import service
from twisted.python import log
class DeadDirectoryNodeError(Exception):
"""The directory referenced by this node has been deleted."""
class BadDirectoryError(Exception):
"""There was a problem with the directory being referenced."""
class BadFileError(Exception):
"""The file being referenced does not exist."""
class MutableDirectoryNode(Referenceable):
implements(RIMutableDirectoryNode)
def __init__(self, basedir):
self._basedir = basedir
def validate_name(self, name):
if name == "." or name == ".." or "/" in name:
raise DeadDirectoryNodeError("bad filename component")
# this is private
def get_child(self, name):
self.validate_name(name)
absname = os.path.join(self._basedir, name)
if os.path.isdir(absname):
return MutableDirectoryNode(absname)
raise DeadDirectoryNodeError("no such directory")
# these are the public methods, available to anyone who holds a reference
def list(self):
log.msg("Dir(%s).list" % self._basedir)
results = []
if not os.path.isdir(self._basedir):
raise DeadDirectoryNodeError("This directory has been deleted")
for name in os.listdir(self._basedir):
absname = os.path.join(self._basedir, name)
if os.path.isdir(absname):
results.append( (name, MutableDirectoryNode(absname)) )
elif os.path.isfile(absname):
f = open(absname, "rb")
data = f.read()
f.close()
results.append( (name, data) )
# anything else is ignored
return results
remote_list = list
def add_directory(self, name):
self.validate_name(name)
absname = os.path.join(self._basedir, name)
if os.path.isdir(absname):
raise BadDirectoryError("the directory '%s' already exists" % name)
if os.path.exists(absname):
raise BadDirectoryError("the directory '%s' already exists "
"(but isn't a directory)" % name)
os.mkdir(absname)
return MutableDirectoryNode(absname)
remote_add_directory = add_directory
def add_file(self, name, data):
self.validate_name(name)
f = open(os.path.join(self._basedir, name), "wb")
f.write(data)
f.close()
remote_add_file = add_file
def remove(self, name):
self.validate_name(name)
absname = os.path.join(self._basedir, name)
if os.path.isdir(absname):
shutil.rmtree(absname)
elif os.path.isfile(absname):
os.unlink(absname)
else:
raise BadFileError("Cannot delete non-existent file '%s'" % name)
class GlobalVirtualDrive(service.MultiService):
name = "filetable"
VDRIVEDIR = "vdrive"
def __init__(self, basedir="."):
service.MultiService.__init__(self)
vdrive_dir = os.path.join(basedir, self.VDRIVEDIR)
if not os.path.exists(vdrive_dir):
os.mkdir(vdrive_dir)
self._root = MutableDirectoryNode(vdrive_dir)
def get_root(self):
return self._root

@ -12,10 +12,12 @@ RIClient_ = Any()
Referenceable_ = Any()
RIBucketWriter_ = Any()
RIBucketReader_ = Any()
RIMutableDirectoryNode_ = Any()
RIMutableFileNode_ = Any()
class RIQueenRoster(RemoteInterface):
def hello(nodeid=Nodeid, node=RIClient_, pburl=PBURL):
return Nothing()
return RIMutableDirectoryNode_ # the virtual drive root
class RIClient(RemoteInterface):
def get_service(name=str):
@ -27,7 +29,8 @@ class RIClient(RemoteInterface):
class RIStorageServer(RemoteInterface):
def allocate_bucket(verifierid=Verifierid, bucket_num=int, size=int,
leaser=Nodeid):
leaser=Nodeid, canary=Referenceable_):
# if the canary is lost before close(), the bucket is deleted
return RIBucketWriter_
def get_buckets(verifierid=Verifierid):
return ListOf(TupleOf(int, RIBucketReader_))
@ -51,3 +54,20 @@ class RIBucketReader(RemoteInterface):
return ShareData
class RIMutableDirectoryNode(RemoteInterface):
def list():
return ListOf( TupleOf(str, # name, relative to directory
(RIMutableDirectoryNode_, Verifierid)),
maxLength=100,
)
def add_directory(name=str):
return RIMutableDirectoryNode_
def add_file(name=str, data=Verifierid):
return Nothing()
def remove(name=str):
return Nothing()
# need more to move directories

@ -8,6 +8,7 @@ from allmydata.util import idlib
from zope.interface import implements
from allmydata.interfaces import RIQueenRoster
from allmydata import node
from allmydata.filetable import GlobalVirtualDrive
class Roster(service.MultiService, Referenceable):
implements(RIQueenRoster)
@ -16,6 +17,10 @@ class Roster(service.MultiService, Referenceable):
service.MultiService.__init__(self)
self.phonebook = {}
self.connections = {}
self.gvd_root = None
def set_gvd_root(self, root):
self.gvd_root = root
def remote_hello(self, nodeid, node, pburl):
log.msg("roster: contact from %s" % idlib.b2a(nodeid))
@ -26,6 +31,7 @@ class Roster(service.MultiService, Referenceable):
self.phonebook[nodeid] = pburl
self.connections[nodeid] = node
node.notifyOnDisconnect(self._lost_node, nodeid)
return self.gvd_root
def _educate_the_new_peer(self, nodeid, node, new_peers):
log.msg("roster: educating %s (%d)" % (idlib.b2a(nodeid)[:4], len(new_peers)))
@ -56,6 +62,7 @@ class Queen(node.Node):
def __init__(self, basedir="."):
node.Node.__init__(self, basedir)
self.gvd = self.add_service(GlobalVirtualDrive(basedir))
self.urls = {}
def tub_ready(self):
@ -65,5 +72,5 @@ class Queen(node.Node):
f = open(os.path.join(self.basedir, "roster_pburl"), "w")
f.write(self.urls["roster"] + "\n")
f.close()
r.set_gvd_root(self.gvd.get_root())

@ -22,11 +22,12 @@ class StorageServer(service.MultiService, Referenceable):
self._bucketstore = BucketStore(store_dir)
self._bucketstore.setServiceParent(self)
def remote_allocate_bucket(self, verifierid, bucket_num, size, leaser):
def remote_allocate_bucket(self, verifierid, bucket_num, size, leaser,
canary):
if self._bucketstore.has_bucket(verifierid):
raise BucketAlreadyExistsError()
lease = self._bucketstore.allocate_bucket(verifierid, bucket_num, size,
idlib.b2a(leaser))
idlib.b2a(leaser), canary)
return lease
def remote_get_buckets(self, verifierid):

@ -0,0 +1,51 @@
import os
from twisted.trial import unittest
from allmydata.filetable import MutableDirectoryNode, \
DeadDirectoryNodeError, BadDirectoryError, BadFileError
class FileTable(unittest.TestCase):
def test_files(self):
os.mkdir("filetable")
root = MutableDirectoryNode(os.path.abspath("filetable"))
self.failUnlessEqual(root.list(), [])
root.add_file("one", "vid-one")
root.add_file("two", "vid-two")
self.failUnlessEqual(root.list(), [("one", "vid-one"),
("two", "vid-two")])
root.remove("two")
self.failUnlessEqual(root.list(), [("one", "vid-one")])
self.failUnlessRaises(BadFileError, root.remove, "two")
self.failUnlessRaises(BadFileError, root.remove, "three")
# now play with directories
subdir1 = root.add_directory("subdir1")
self.failUnless(isinstance(subdir1, MutableDirectoryNode))
entries = root.list()
self.failUnlessEqual(len(entries), 2)
one_index = entries.index( ("one", "vid-one") )
subdir_index = 1 - one_index
self.failUnlessEqual(entries[subdir_index][0], "subdir1")
subdir2 = entries[subdir_index][1]
self.failUnless(isinstance(subdir2, MutableDirectoryNode))
self.failUnlessEqual(subdir1.list(), [])
self.failUnlessEqual(subdir2.list(), [])
subdir1.add_file("subone", "vid-subone")
self.failUnlessEqual(subdir1.list(), [("subone", "vid-subone")])
self.failUnlessEqual(subdir2.list(), [("subone", "vid-subone")])
self.failUnlessEqual(len(root.list()), 2)
self.failUnlessRaises(BadDirectoryError, root.add_directory, "subdir1")
self.failUnlessRaises(BadDirectoryError, root.add_directory, "one")
root.remove("subdir1")
self.failUnlessEqual(root.list(), [("one", "vid-one")])
# should our (orphaned) subdir1/subdir2 node still be able to do
# anything?
self.failUnlessRaises(DeadDirectoryNodeError, subdir1.list)

@ -5,11 +5,14 @@ import random
from twisted.trial import unittest
from twisted.application import service
from twisted.internet import defer
from foolscap import Tub
from foolscap import Tub, Referenceable
from foolscap.eventual import flushEventualQueue
from allmydata import client
class Canary(Referenceable):
pass
class StorageTest(unittest.TestCase):
def setUp(self):
@ -39,6 +42,7 @@ class StorageTest(unittest.TestCase):
bucket_num=bnum,
size=len(data),
leaser=self.node.nodeid,
canary=Canary(),
)
rssd.addCallback(create_bucket)
@ -99,6 +103,7 @@ class StorageTest(unittest.TestCase):
bucket_num=bnum,
size=len(data),
leaser=self.node.nodeid,
canary=Canary(),
)
rssd.addCallback(create_bucket)

@ -1,7 +1,9 @@
from zope.interface import Interface, implements
from twisted.python import failure, log
from twisted.internet import defer
from twisted.application import service
from foolscap import Referenceable
from allmydata.util import idlib
from allmydata import encode
@ -33,6 +35,7 @@ class FileUploader:
filehandle.seek(0)
def make_encoder(self):
self._needed_shares = 4
self._shares = 4
self._encoder = encode.Encoder(self._filehandle, self._shares)
self._share_size = self._size
@ -53,6 +56,7 @@ class FileUploader:
max_peers = None
self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
self._total_peers = len(self.permuted)
for p in self.permuted:
assert isinstance(p, str)
# we will shrink self.permuted as we give up on peers
@ -68,7 +72,12 @@ class FileUploader:
def _check_next_peer(self):
if len(self.permuted) == 0:
# there are no more to check
raise NotEnoughPeersError
raise NotEnoughPeersError("%s goodness, want %s, have %d "
"landlords, %d total peers" %
(self.goodness_points,
self.target_goodness,
len(self.landlords),
self._total_peers))
if self.peer_index >= len(self.permuted):
self.peer_index = 0
@ -82,7 +91,8 @@ class FileUploader:
verifierid=self._verifierid,
bucket_num=bucket_num,
size=self._share_size,
leaser=self._peer.nodeid)
leaser=self._peer.nodeid,
canary=Referenceable())
def _allocate_response(bucket):
if self.debug:
print " peerid %s will grant us a lease" % idlib.b2a(peerid)
@ -127,6 +137,40 @@ class FileUploader:
def netstring(s):
return "%d:%s," % (len(s), s)
class IUploadable(Interface):
def get_filehandle():
pass
def close_filehandle(f):
pass
class FileName:
implements(IUploadable)
def __init__(self, filename):
self._filename = filename
def get_filehandle(self):
return open(self._filename, "rb")
def close_filehandle(self, f):
f.close()
class Data:
implements(IUploadable)
def __init__(self, data):
self._data = data
def get_filehandle(self):
return StringIO(self._data)
def close_filehandle(self, f):
pass
class FileHandle:
implements(IUploadable)
def __init__(self, filehandle):
self._filehandle = filehandle
def get_filehandle(self):
return self._filehandle
def close_filehandle(self, f):
# the originator of the filehandle reserves the right to close it
pass
class Uploader(service.MultiService):
"""I am a service that allows file uploading.
"""
@ -140,26 +184,26 @@ class Uploader(service.MultiService):
# note: this is only of the plaintext data, no encryption yet
return hasher.digest()
def upload_filename(self, filename):
f = open(filename, "rb")
def upload(self, f):
assert self.parent
assert self.running
f = IUploadable(f)
fh = f.get_filehandle()
u = FileUploader(self.parent)
u.set_filehandle(fh)
u.set_verifierid(self._compute_verifierid(fh))
u.make_encoder()
d = u.start()
def _done(res):
f.close()
f.close_filehandle(fh)
return res
d = self.upload_filehandle(f)
d.addBoth(_done)
return d
# utility functions
def upload_data(self, data):
f = StringIO(data)
return self.upload_filehandle(f)
def upload_filehandle(self, f):
assert self.parent
assert self.running
u = FileUploader(self.parent)
u.set_filehandle(f)
u.set_verifierid(self._compute_verifierid(f))
u.make_encoder()
d = u.start()
return d
return self.upload(Data(data))
def upload_filename(self, filename):
return self.upload(FileName(filename))
def upload_filehandle(self, filehandle):
return self.upload(FileHandle(filehandle))

88
allmydata/vdrive.py Normal file

@ -0,0 +1,88 @@
"""This is the client-side facility to manipulate virtual drives."""
from twisted.application import service
from twisted.internet import defer
from allmydata.upload import Data, FileHandle, FileName
class VDrive(service.MultiService):
name = "vdrive"
def set_root(self, root):
self.gvd_root = root
def dirpath(self, dir_or_path):
if isinstance(dir_or_path, str):
return self.get_dir(dir_or_path)
return defer.succeed(dir_or_path)
def get_dir(self, path):
"""Return a Deferred that fires with a RemoteReference to a
MutableDirectoryNode at the given /-delimited path."""
d = defer.succeed(self.gvd_root)
if path.startswith("/"):
path = path[1:]
if path == "":
return d
for piece in path.split("/"):
d.addCallback(lambda parent: parent.callRemote("list"))
def _find(table, subdir):
for name,target in table:
if name == subdir:
return subdir
else:
raise KeyError("no such directory '%s' in '%s'" %
(subdir, [t[0] for t in table]))
d.addCallback(_find, piece)
def _check(subdir):
assert not isinstance(subdir, str)
return subdir
d.addCallback(_check)
return d
def get_root(self):
return self.gvd_root
def listdir(self, dir_or_path):
d = self.dirpath(dir_or_path)
d.addCallback(lambda parent: parent.callRemote("list"))
def _list(table):
return [t[0] for t in table]
d.addCallback(_list)
return d
def put_file(self, dir_or_path, name, uploadable):
"""Upload an IUploadable and add it to the virtual drive (as an entry
called 'name', in 'dir_or_path') 'dir_or_path' must either be a
string like 'root/subdir1/subdir2', or a directory node (either the
root directory node returned by get_root(), or a subdirectory
returned by list() ).
The uploadable can be an instance of allmydata.upload.Data,
FileHandle, or FileName.
I return a deferred that will fire when the operation is complete.
"""
u = self.parent.getServiceNamed("uploader")
d = self.dirpath(dir_or_path)
def _got_dir(dirnode):
d1 = u.upload(uploadable)
d1.addCallback(lambda vid:
dirnode.callRemote("add_file", name, vid))
return d1
d.addCallback(_got_dir)
return d
def put_file_by_filename(self, dir_or_path, name, filename):
return self.put_file(dir_or_path, name, FileName(filename))
def put_file_by_data(self, dir_or_path, name, data):
return self.put_file(dir_or_path, name, Data(data))
def put_file_by_filehandle(self, dir_or_path, name, filehandle):
return self.put_file(dir_or_path, name, FileHandle(filehandle))
def make_directory(self, dir_or_path, name):
d = self.dirpath(dir_or_path)
d.addCallback(lambda parent: parent.callRemote("add_directory", name))
return d