mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-21 13:57:51 +00:00
rerecord all the storageserver patches in one go
darcs was dying trying to deal with the conflict resolution patches. this adds a (very rough) bucketstore and storageserver. probably needs lots of work both in api and implementation.
This commit is contained in:
parent
918a1fca23
commit
94e051c1f0
122
allmydata/bucketstore.py
Normal file
122
allmydata/bucketstore.py
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
from foolscap import Referenceable
|
||||||
|
from twisted.application import service
|
||||||
|
from twisted.python.failure import Failure
|
||||||
|
from allmydata.util import idlib
|
||||||
|
|
||||||
|
from amdlib.util.assertutil import precondition
|
||||||
|
|
||||||
|
class NoSuchBucketError(Failure):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class BucketStore(service.MultiService, Referenceable):
|
||||||
|
def __init__(self, store_dir):
|
||||||
|
precondition(os.path.isdir(store_dir))
|
||||||
|
service.MultiService.__init__(self)
|
||||||
|
self._store_dir = store_dir
|
||||||
|
|
||||||
|
self._buckets = {} # v_id -> Bucket()
|
||||||
|
self._leases = set() # should do weakref dances.
|
||||||
|
|
||||||
|
def _get_bucket_dir(self, verifierid):
|
||||||
|
avid = idlib.b2a(verifierid)
|
||||||
|
return os.path.join(self._store_dir, avid)
|
||||||
|
|
||||||
|
def has_bucket(self, verifierid):
|
||||||
|
return os.path.exists(self._get_bucket_dir(verifierid))
|
||||||
|
|
||||||
|
def allocate_bucket(self, verifierid, bucket_num, size, leaser_credentials):
|
||||||
|
bucket_dir = self._get_bucket_dir(verifierid)
|
||||||
|
precondition(not os.path.exists(bucket_dir))
|
||||||
|
precondition(isinstance(bucket_num, int))
|
||||||
|
bucket = Bucket(bucket_dir, verifierid, bucket_num, size)
|
||||||
|
self._buckets[verifierid] = bucket
|
||||||
|
bucket.set_leaser(leaser_credentials)
|
||||||
|
lease = Lease(verifierid, leaser_credentials, bucket)
|
||||||
|
self._leases.add(lease)
|
||||||
|
return lease
|
||||||
|
|
||||||
|
def get_bucket(self, verifierid):
|
||||||
|
# for now, only returns those created by this process, in this run
|
||||||
|
bucket = self._buckets.get(verifierid)
|
||||||
|
if bucket:
|
||||||
|
return BucketReader(bucket)
|
||||||
|
else:
|
||||||
|
return NoSuchBucketError()
|
||||||
|
|
||||||
|
class Lease(Referenceable):
|
||||||
|
def __init__(self, verifierid, leaser, bucket):
|
||||||
|
self._leaser = leaser
|
||||||
|
self._verifierid = verifierid
|
||||||
|
self._bucket = bucket
|
||||||
|
|
||||||
|
def get_bucket(self):
|
||||||
|
return self._bucket
|
||||||
|
|
||||||
|
def remote_write(self, data):
|
||||||
|
self._bucket.write(data)
|
||||||
|
|
||||||
|
def remote_finalise(self):
|
||||||
|
self._bucket.finalise()
|
||||||
|
|
||||||
|
class BucketReader(Referenceable):
|
||||||
|
def __init__(self, bucket):
|
||||||
|
self._bucket = bucket
|
||||||
|
|
||||||
|
def remote_get_bucket_num(self):
|
||||||
|
return self._bucket.get_bucket_num()
|
||||||
|
|
||||||
|
def remote_read(self):
|
||||||
|
return self._bucket.read()
|
||||||
|
|
||||||
|
class Bucket:
|
||||||
|
def __init__(self, bucket_dir, verifierid, bucket_num, size):
|
||||||
|
os.mkdir(bucket_dir)
|
||||||
|
self._bucket_dir = bucket_dir
|
||||||
|
self._size = size
|
||||||
|
self._verifierid = verifierid
|
||||||
|
|
||||||
|
self._data = file(os.path.join(self._bucket_dir, 'data'), 'wb')
|
||||||
|
self._bytes_written = 0
|
||||||
|
|
||||||
|
self._write_attr('bucket_num', str(bucket_num))
|
||||||
|
|
||||||
|
def _write_attr(self, name, val):
|
||||||
|
f = file(os.path.join(self._bucket_dir, 'name'), 'wb')
|
||||||
|
f.write(val)
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
def _read_attr(self, name):
|
||||||
|
f = file(os.path.join(self._bucket_dir, 'name'), 'wb')
|
||||||
|
data = f.read()
|
||||||
|
f.close()
|
||||||
|
return data
|
||||||
|
|
||||||
|
def set_leaser(self, leaser):
|
||||||
|
f = file(os.path.join(self._bucket_dir, 'leases'), 'wb')
|
||||||
|
f.write(leaser)
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
precondition(len(data) + self._bytes_written <= self._size)
|
||||||
|
self._data.write(data)
|
||||||
|
self._data.flush()
|
||||||
|
|
||||||
|
def finalise(self):
|
||||||
|
precondition(self._bytes_written == self._size)
|
||||||
|
self._data.close()
|
||||||
|
|
||||||
|
def is_complete(self):
|
||||||
|
return os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size
|
||||||
|
|
||||||
|
def get_bucket_num(self):
|
||||||
|
return int(self._read_attr('bucket_num'))
|
||||||
|
|
||||||
|
def read(self):
|
||||||
|
precondition(self.is_complete())
|
||||||
|
f = file(os.path.join(self._bucket_dir, 'data'), 'rb')
|
||||||
|
data = f.read()
|
||||||
|
f.close()
|
||||||
|
return data
|
||||||
|
|
@ -9,12 +9,12 @@ from twisted.internet import reactor
|
|||||||
from twisted.internet.base import BlockingResolver
|
from twisted.internet.base import BlockingResolver
|
||||||
reactor.installResolver(BlockingResolver())
|
reactor.installResolver(BlockingResolver())
|
||||||
|
|
||||||
class Storage(service.MultiService, Referenceable):
|
from allmydata.storageserver import StorageServer
|
||||||
name = "storage"
|
|
||||||
pass
|
|
||||||
|
|
||||||
class Client(service.MultiService, Referenceable):
|
class Client(service.MultiService, Referenceable):
|
||||||
CERTFILE = "client.pem"
|
CERTFILE = "client.pem"
|
||||||
|
AUTHKEYSFILE = "authorized_keys"
|
||||||
|
STOREDIR = 'storage'
|
||||||
|
|
||||||
def __init__(self, queen_pburl):
|
def __init__(self, queen_pburl):
|
||||||
service.MultiService.__init__(self)
|
service.MultiService.__init__(self)
|
||||||
@ -31,7 +31,7 @@ class Client(service.MultiService, Referenceable):
|
|||||||
self.queen = None # self.queen is either None or a RemoteReference
|
self.queen = None # self.queen is either None or a RemoteReference
|
||||||
self.all_peers = set()
|
self.all_peers = set()
|
||||||
self.connections = {}
|
self.connections = {}
|
||||||
s = Storage()
|
s = StorageServer(self.STOREDIR)
|
||||||
s.setServiceParent(self)
|
s.setServiceParent(self)
|
||||||
|
|
||||||
AUTHKEYSFILEBASE = "authorized_keys."
|
AUTHKEYSFILEBASE = "authorized_keys."
|
||||||
|
30
allmydata/storageserver.py
Normal file
30
allmydata/storageserver.py
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
from foolscap import Referenceable
|
||||||
|
from twisted.application import service
|
||||||
|
from twisted.python.failure import Failure
|
||||||
|
|
||||||
|
from amdlib.util.assertutil import precondition
|
||||||
|
|
||||||
|
from allmydata.bucketstore import BucketStore
|
||||||
|
|
||||||
|
class BucketAlreadyExistsError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class StorageServer(service.MultiService, Referenceable):
|
||||||
|
name = 'storageserver'
|
||||||
|
|
||||||
|
def __init__(self, store_dir):
|
||||||
|
precondition(os.path.isdir(store_dir))
|
||||||
|
service.MultiService.__init__(self)
|
||||||
|
self._bucketstore = BucketStore(store_dir)
|
||||||
|
self._bucketstore.setServiceParent(self)
|
||||||
|
|
||||||
|
def remote_allocate_bucket(self, verifierid, bucket_num, size, leaser):
|
||||||
|
if self._bucketstore.has_bucket(verifierid):
|
||||||
|
raise BucketAlreadyExistsError()
|
||||||
|
lease = self._bucketstore.allocate_bucket(verifierid, bucket_num, size, leaser)
|
||||||
|
return lease
|
||||||
|
|
||||||
|
def remote_get_bucket(self, verifierid):
|
||||||
|
return self._bucketstore.get_bucket(verifierid)
|
7
allmydata/util/idlib.py
Normal file
7
allmydata/util/idlib.py
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
from base64 import b32encode, b32decode
|
||||||
|
|
||||||
|
def b2a(i):
|
||||||
|
return b32encode(i).lower()
|
||||||
|
|
||||||
|
def a2b(i):
|
||||||
|
return b32decode(i.upper())
|
Loading…
Reference in New Issue
Block a user