split 'Bucket' into separate subclasses for read and write

This commit is contained in:
Rob Kinninmont 2006-12-01 03:04:54 -07:00
parent 5e40272785
commit 14a2dbd553

View File

@ -16,7 +16,6 @@ class BucketStore(service.MultiService, Referenceable):
service.MultiService.__init__(self) service.MultiService.__init__(self)
self._store_dir = store_dir self._store_dir = store_dir
self._buckets = {} # v_id -> Bucket()
self._leases = set() # should do weakref dances. self._leases = set() # should do weakref dances.
def _get_bucket_dir(self, verifierid): def _get_bucket_dir(self, verifierid):
@ -30,8 +29,7 @@ class BucketStore(service.MultiService, Referenceable):
bucket_dir = self._get_bucket_dir(verifierid) bucket_dir = self._get_bucket_dir(verifierid)
precondition(not os.path.exists(bucket_dir)) precondition(not os.path.exists(bucket_dir))
precondition(isinstance(bucket_num, int)) precondition(isinstance(bucket_num, int))
bucket = Bucket(bucket_dir, verifierid, bucket_num, size) bucket = WriteBucket(bucket_dir, verifierid, bucket_num, size)
self._buckets[verifierid] = bucket
bucket.set_leaser(leaser_credentials) bucket.set_leaser(leaser_credentials)
lease = Lease(verifierid, leaser_credentials, bucket) lease = Lease(verifierid, leaser_credentials, bucket)
self._leases.add(lease) self._leases.add(lease)
@ -39,14 +37,9 @@ class BucketStore(service.MultiService, Referenceable):
def get_bucket(self, verifierid): def get_bucket(self, verifierid):
# for now, only returns those created by this process, in this run # for now, only returns those created by this process, in this run
bucket = self._buckets.get(verifierid) bucket_dir = self._get_bucket_dir(verifierid)
if bucket: if os.path.exists(bucket_dir):
precondition(bucket.is_complete()) return BucketReader(ReadBucket(bucket_dir, verifierid))
return BucketReader(bucket)
elif os.path.exists(self._get_bucket_dir(verifierid)):
bucket_dir = self._get_bucket_dir(verifierid)
bucket = Bucket(bucket_dir, verifierid, None, None)
return BucketReader(bucket)
else: else:
return NoSuchBucketError() return NoSuchBucketError()
@ -76,26 +69,10 @@ class BucketReader(Referenceable):
return self._bucket.read() return self._bucket.read()
class Bucket: class Bucket:
def __init__(self, bucket_dir, verifierid, bucket_num, size): def __init__(self, bucket_dir, verifierid):
if not os.path.isdir(bucket_dir):
os.mkdir(bucket_dir)
self._bucket_dir = bucket_dir self._bucket_dir = bucket_dir
self._verifierid = verifierid self._verifierid = verifierid
if size is not None:
self._size = size
self._data = file(os.path.join(self._bucket_dir, 'data'), 'wb')
self._bytes_written = 0
else:
precondition(os.path.exists(os.path.join(self._bucket_dir, 'closed')))
self._size = os.path.getsize(os.path.join(self._bucket_dir, 'data'))
self._bytes_written = self._size
if bucket_num is not None:
self._write_attr('bucket_num', str(bucket_num))
#else:
#bucket_num = int(self._read_attr('bucket_num'))
def _write_attr(self, name, val): def _write_attr(self, name, val):
f = file(os.path.join(self._bucket_dir, name), 'wb') f = file(os.path.join(self._bucket_dir, name), 'wb')
f.write(val) f.write(val)
@ -107,10 +84,23 @@ class Bucket:
f.close() f.close()
return data return data
def is_complete(self):
return os.path.exists(os.path.join(self._bucket_dir, 'closed'))
class WriteBucket(Bucket):
def __init__(self, bucket_dir, verifierid, bucket_num, size):
Bucket.__init__(self, bucket_dir, verifierid)
precondition(not os.path.exists(bucket_dir))
os.mkdir(bucket_dir)
self._size = size
self._data = file(os.path.join(self._bucket_dir, 'data'), 'wb')
self._bytes_written = 0
self._write_attr('bucket_num', str(bucket_num))
def set_leaser(self, leaser): def set_leaser(self, leaser):
f = file(os.path.join(self._bucket_dir, 'leases'), 'wb') self._write_attr('leases', leaser)
f.write(leaser)
f.close()
def write(self, data): def write(self, data):
precondition(len(data) + self._bytes_written <= self._size) precondition(len(data) + self._bytes_written <= self._size)
@ -124,15 +114,19 @@ class Bucket:
self._write_attr('closed', '') self._write_attr('closed', '')
def is_complete(self): def is_complete(self):
return os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size complete = Bucket.is_complete(self)
if complete:
_assert(os.path.getsize(os.path.join(self._bucket_dir, 'data')) == self._size)
return complete
class ReadBucket(Bucket):
def __init__(self, bucket_dir, verifierid):
Bucket.__init__(self, bucket_dir, verifierid)
precondition(self.is_complete()) # implicitly asserts bucket_dir exists
def get_bucket_num(self): def get_bucket_num(self):
return int(self._read_attr('bucket_num')) return int(self._read_attr('bucket_num'))
def read(self): def read(self):
precondition(self.is_complete()) return self._read_attr('data')
f = file(os.path.join(self._bucket_dir, 'data'), 'rb')
data = f.read()
f.close()
return data