2007-08-27 23:41:40 -07:00
import os, re, weakref, stat, struct, time
2008-03-13 09:10:11 -07:00
from distutils.version import LooseVersion
2007-09-17 00:48:40 -07:00
from itertools import chain
2006-11-30 20:14:23 -07:00
from foolscap import Referenceable
from twisted.application import service
2007-07-13 14:04:49 -07:00
from twisted.internet import defer
2006-11-30 20:14:23 -07:00
2006-12-01 19:17:50 -07:00
from zope.interface import implements
2007-04-04 15:59:36 -07:00
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
2007-10-30 19:47:36 -07:00
RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \
2008-01-31 21:10:15 -07:00
BadWriteEnablerError, IStatsProducer
2008-02-14 19:27:47 -07:00
from allmydata.util import base32, fileutil, idlib, mathutil, log
2007-09-04 09:00:24 -07:00
from allmydata.util.assertutil import precondition, _assert
2008-02-05 13:05:13 -07:00
import allmydata # for __version__
2006-11-30 20:14:23 -07:00
2007-10-30 19:47:36 -07:00
class DataTooLargeError(Exception):
2007-09-02 14:47:15 -07:00
# storage/
# storage/shares/incoming
2008-01-31 16:26:28 -07:00
# incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
# be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
# storage/shares/$START/$STORAGEINDEX
2008-05-07 08:39:03 -07:00
# Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
# base-32 chars).
2006-11-30 20:14:23 -07:00
2007-03-29 20:19:52 -07:00
# $SHARENUM matches this regex:
2007-10-31 01:44:01 -07:00
2007-03-29 20:19:52 -07:00
2007-09-02 14:47:15 -07:00
# each share file (in storage/shares/$SI/$SHNUM) contains lease information
# and share data. The share data is accessed by RIBucketWriter.write and
# RIBucketReader.read . The lease information is not accessible through these
# interfaces.
# The share file has the following layout:
# 0x00: share file version number, four bytes, current version is 1
# 0x04: share data length, four bytes big-endian = A
# 0x08: number of leases, four bytes big-endian
# 0x0c: beginning of share data (described below, at WriteBucketProxy)
# A+0x0c = B: first lease. Lease format is:
# B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
# B+0x04: renew secret, 32 bytes (SHA256)
# B+0x24: cancel secret, 32 bytes (SHA256)
# B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
# B+0x48: next lease, or end of record
2008-02-12 20:48:37 -07:00
def si_b2a(storageindex):
2008-02-14 19:27:47 -07:00
return base32.b2a(storageindex)
2008-02-12 20:48:37 -07:00
def si_a2b(ascii_storageindex):
2008-02-14 19:27:47 -07:00
return base32.a2b(ascii_storageindex)
2008-02-12 20:48:37 -07:00
2008-01-31 16:26:28 -07:00
def storage_index_to_dir(storageindex):
2008-02-12 20:48:37 -07:00
sia = si_b2a(storageindex)
return os.path.join(sia[:2], sia)
2008-01-31 16:26:28 -07:00
2007-09-02 14:47:15 -07:00
class ShareFile:
LEASE_SIZE = struct.calcsize(">L32s32sL")
def __init__(self, filename):
self.home = filename
f = open(self.home, 'rb')
(version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
assert version == 1
self._size = size
self._num_leases = num_leases
self._data_offset = 0xc
self._lease_offset = 0xc + self._size
def read_share_data(self, offset, length):
precondition(offset >= 0)
precondition(offset+length <= self._size)
f = open(self.home, 'rb')
return f.read(length)
def write_share_data(self, offset, data):
length = len(data)
precondition(offset >= 0)
precondition(offset+length <= self._size)
f = open(self.home, 'rb+')
real_offset = self._data_offset+offset
assert f.tell() == real_offset
def _write_lease_record(self, f, lease_number, lease_info):
(owner_num, renew_secret, cancel_secret, expiration_time) = lease_info
offset = self._lease_offset + lease_number * self.LEASE_SIZE
assert f.tell() == offset
owner_num, renew_secret, cancel_secret,
2007-09-11 14:53:31 -07:00
2007-09-02 14:47:15 -07:00
def _read_num_leases(self, f):
(num_leases,) = struct.unpack(">L", f.read(4))
return num_leases
def _write_num_leases(self, f, num_leases):
f.write(struct.pack(">L", num_leases))
def _truncate_leases(self, f, num_leases):
f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
def iter_leases(self):
"""Yields (ownernum, renew_secret, cancel_secret, expiration_time)
for all leases."""
f = open(self.home, 'rb')
(version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
for i in range(num_leases):
data = f.read(self.LEASE_SIZE)
if data:
yield struct.unpack(">L32s32sL", data)
def add_lease(self, lease_info):
f = open(self.home, 'rb+')
num_leases = self._read_num_leases(f)
self._write_lease_record(f, num_leases, lease_info)
self._write_num_leases(f, num_leases+1)
def renew_lease(self, renew_secret, new_expire_time):
for i,(on,rs,cs,et) in enumerate(self.iter_leases()):
if rs == renew_secret:
# yup. See if we need to update the owner time.
if new_expire_time > et:
# yes
new_lease = (on,rs,cs,new_expire_time)
f = open(self.home, 'rb+')
self._write_lease_record(f, i, new_lease)
raise IndexError("unable to renew non-existent lease")
2007-09-02 21:39:47 -07:00
def add_or_renew_lease(self, lease_info):
owner_num, renew_secret, cancel_secret, expire_time = lease_info
self.renew_lease(renew_secret, expire_time)
except IndexError:
2007-09-02 14:47:15 -07:00
def cancel_lease(self, cancel_secret):
"""Remove a lease with the given cancel_secret. Return
(num_remaining_leases, space_freed). Raise IndexError if there was no
lease with the given cancel_secret."""
leases = list(self.iter_leases())
num_leases = len(leases)
num_leases_removed = 0
for i,lease_info in enumerate(leases[:]):
(on,rs,cs,et) = lease_info
if cs == cancel_secret:
leases[i] = None
num_leases_removed += 1
if not num_leases_removed:
raise IndexError("unable to find matching lease to cancel")
if num_leases_removed:
# pack and write out the remaining leases. We write these out in
# the same order as they were added, so that if we crash while
# doing this, we won't lose any non-cancelled leases.
leases = [l for l in leases if l] # remove the cancelled leases
f = open(self.home, 'rb+')
for i,lease in enumerate(leases):
self._write_lease_record(f, i, lease)
self._write_num_leases(f, len(leases))
self._truncate_leases(f, len(leases))
return len(leases), self.LEASE_SIZE * num_leases_removed
2007-03-29 20:19:52 -07:00
class BucketWriter(Referenceable):
2008-01-14 21:22:55 -07:00
def __init__(self, ss, incominghome, finalhome, size, lease_info, canary):
2007-07-03 17:08:02 -07:00
self.ss = ss
2007-03-30 10:52:19 -07:00
self.incominghome = incominghome
2007-03-29 20:19:52 -07:00
self.finalhome = finalhome
2007-07-13 14:04:49 -07:00
self._size = size
2008-01-14 21:22:55 -07:00
self._canary = canary
self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
2007-03-29 20:19:52 -07:00
self.closed = False
2007-07-16 18:07:03 -07:00
self.throw_out_all_data = False
2007-09-02 14:47:15 -07:00
# touch the file, so later callers will see that we're working on it.
2008-01-31 16:26:28 -07:00
assert not os.path.exists(incominghome)
2007-09-02 14:47:15 -07:00
# Also construct the metadata.
2008-01-31 16:26:28 -07:00
f = open(incominghome, 'wb')
2007-09-02 14:47:15 -07:00
f.write(struct.pack(">LLL", 1, size, 0))
2007-07-13 14:04:49 -07:00
2008-01-31 16:26:28 -07:00
self._sharefile = ShareFile(incominghome)
2007-09-02 14:57:49 -07:00
# also, add our lease to the file now, so that other ones can be
# added by simultaneous uploaders
2007-03-29 20:19:52 -07:00
2007-07-03 17:08:02 -07:00
def allocated_size(self):
2007-07-13 14:04:49 -07:00
return self._size
2007-07-03 17:08:02 -07:00
2007-07-13 14:04:49 -07:00
def remote_write(self, offset, data):
2007-03-29 20:19:52 -07:00
precondition(not self.closed)
2007-07-16 18:07:03 -07:00
if self.throw_out_all_data:
2007-09-02 14:47:15 -07:00
self._sharefile.write_share_data(offset, data)
2007-06-01 18:48:01 -07:00
2007-03-30 16:50:50 -07:00
def remote_close(self):
2007-03-29 20:19:52 -07:00
precondition(not self.closed)
2008-01-31 16:26:28 -07:00
2007-03-30 10:52:19 -07:00
fileutil.rename(self.incominghome, self.finalhome)
2008-01-31 16:26:28 -07:00
except EnvironmentError:
2007-09-02 14:47:15 -07:00
self._sharefile = None
2007-03-29 20:19:52 -07:00
self.closed = True
2008-01-14 21:22:55 -07:00
2007-09-02 14:47:15 -07:00
2007-07-13 14:04:49 -07:00
filelen = os.stat(self.finalhome)[stat.ST_SIZE]
self.ss.bucket_writer_closed(self, filelen)
2007-03-29 20:19:52 -07:00
2008-01-14 21:22:55 -07:00
def _disconnected(self):
if not self.closed:
def remote_abort(self):
log.msg("storage: aborting sharefile %s" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL)
if not self.closed:
def _abort(self):
if self.closed:
# if we were the last share to be moved, remove the incoming/
# directory that was our parent
parentdir = os.path.split(self.incominghome)[0]
if not os.listdir(parentdir):
2007-03-29 20:19:52 -07:00
class BucketReader(Referenceable):
2007-04-04 15:59:36 -07:00
2008-01-31 16:26:28 -07:00
def __init__(self, sharefname):
self._share_file = ShareFile(sharefname)
2007-03-29 20:19:52 -07:00
2007-07-13 14:04:49 -07:00
def remote_read(self, offset, length):
2007-09-02 14:47:15 -07:00
return self._share_file.read_share_data(offset, length)
2007-06-01 18:48:01 -07:00
2007-09-17 00:48:40 -07:00
2007-10-30 19:47:36 -07:00
# the MutableShareFile is like the ShareFile, but used for mutable data. It
# has a different layout. See docs/mutable.txt for more details.
# # offset size name
# 1 0 32 magic verstr "tahoe mutable container v1" plus binary
2007-11-06 18:49:59 -07:00
# 2 32 20 write enabler's nodeid
# 3 52 32 write enabler
# 4 84 8 data size (actual share data present) (a)
# 5 92 8 offset of (8) count of extra leases (after data)
# 6 100 368 four leases, 92 bytes each
2007-10-30 19:47:36 -07:00
# 0 4 ownerid (0 means "no lease here")
# 4 4 expiration timestamp
# 8 32 renewal token
# 40 32 cancel token
2007-11-06 18:49:59 -07:00
# 72 20 nodeid which accepted the tokens
# 7 468 (a) data
2007-10-30 19:47:36 -07:00
# 8 ?? 4 count of extra leases
2007-11-06 18:49:59 -07:00
# 9 ?? n*92 extra leases
2007-10-30 19:47:36 -07:00
assert struct.calcsize("L"), 4
assert struct.calcsize("Q"), 8
2007-11-05 20:17:14 -07:00
class MutableShareFile:
2007-10-30 19:47:36 -07:00
2007-11-06 18:49:59 -07:00
DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
2007-10-31 00:10:40 -07:00
2007-11-06 18:49:59 -07:00
HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
LEASE_SIZE = struct.calcsize(">LL32s32s20s")
assert LEASE_SIZE == 92
2007-10-30 19:47:36 -07:00
2007-11-06 18:49:59 -07:00
2007-10-30 19:47:36 -07:00
# our sharefiles share with a recognizable string, plus some random
# binary data to reduce the chance that a regular text file will look
# like a sharefile.
MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
assert len(MAGIC) == 32
MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
# TODO: decide upon a policy for max share size
2008-01-14 11:58:58 -07:00
def __init__(self, filename, parent=None):
2007-10-30 19:47:36 -07:00
self.home = filename
if os.path.exists(self.home):
2007-10-31 00:10:40 -07:00
# we don't cache anything, just check the magic
2007-10-30 19:47:36 -07:00
f = open(self.home, 'rb')
2007-10-31 00:10:40 -07:00
data = f.read(self.HEADER_SIZE)
2007-10-30 19:47:36 -07:00
2007-10-31 00:10:40 -07:00
write_enabler_nodeid, write_enabler,
data_length, extra_least_offset) = \
2007-11-06 18:49:59 -07:00
struct.unpack(">32s20s32sQQ", data)
2007-10-30 19:47:36 -07:00
assert magic == self.MAGIC
2008-01-14 11:58:58 -07:00
self.parent = parent # for logging
2007-10-30 19:47:36 -07:00
2008-01-14 11:58:58 -07:00
def log(self, *args, **kwargs):
return self.parent.log(*args, **kwargs)
2007-10-30 19:47:36 -07:00
def create(self, my_nodeid, write_enabler):
assert not os.path.exists(self.home)
2007-10-31 00:10:40 -07:00
data_length = 0
extra_lease_offset = (self.HEADER_SIZE
+ 4 * self.LEASE_SIZE
+ data_length)
assert extra_lease_offset == self.DATA_OFFSET # true at creation
num_extra_leases = 0
2007-10-30 19:47:36 -07:00
f = open(self.home, 'wb')
2007-11-06 18:49:59 -07:00
header = struct.pack(">32s20s32sQQ",
2007-10-30 19:47:36 -07:00
self.MAGIC, my_nodeid, write_enabler,
2007-10-31 00:10:40 -07:00
data_length, extra_lease_offset,
2007-10-30 19:47:36 -07:00
2007-10-31 00:10:40 -07:00
leases = ("\x00"*self.LEASE_SIZE) * 4
f.write(header + leases)
2007-10-30 19:47:36 -07:00
# data goes here, empty after creation
2007-10-31 00:10:40 -07:00
f.write(struct.pack(">L", num_extra_leases))
2007-10-30 19:47:36 -07:00
# extra leases go here, none at creation
2007-10-31 00:10:40 -07:00
def _read_data_length(self, f):
(data_length,) = struct.unpack(">Q", f.read(8))
return data_length
2007-10-30 19:47:36 -07:00
2007-10-31 00:10:40 -07:00
def _write_data_length(self, f, data_length):
f.write(struct.pack(">Q", data_length))
def _read_share_data(self, f, offset, length):
2007-10-30 19:47:36 -07:00
precondition(offset >= 0)
2007-10-31 00:10:40 -07:00
data_length = self._read_data_length(f)
if offset+length > data_length:
2007-10-30 19:47:36 -07:00
# reads beyond the end of the data are truncated. Reads that
# start beyond the end of the data return an empty string.
2007-10-31 00:10:40 -07:00
length = max(0, data_length-offset)
2007-10-30 19:47:36 -07:00
if length == 0:
return ""
2007-10-31 00:10:40 -07:00
precondition(offset+length <= data_length)
2007-10-30 19:47:36 -07:00
2007-10-31 00:10:40 -07:00
data = f.read(length)
return data
def _read_extra_lease_offset(self, f):
(extra_lease_offset,) = struct.unpack(">Q", f.read(8))
return extra_lease_offset
def _write_extra_lease_offset(self, f, offset):
f.write(struct.pack(">Q", offset))
2007-10-30 19:47:36 -07:00
2007-10-31 00:10:40 -07:00
def _read_num_extra_leases(self, f):
offset = self._read_extra_lease_offset(f)
(num_extra_leases,) = struct.unpack(">L", f.read(4))
return num_extra_leases
def _write_num_extra_leases(self, f, num_leases):
extra_lease_offset = self._read_extra_lease_offset(f)
f.write(struct.pack(">L", num_leases))
def _change_container_size(self, f, new_container_size):
2007-10-30 19:47:36 -07:00
if new_container_size > self.MAX_SIZE:
raise DataTooLargeError()
2007-10-31 00:10:40 -07:00
old_extra_lease_offset = self._read_extra_lease_offset(f)
2007-10-30 19:47:36 -07:00
new_extra_lease_offset = self.DATA_OFFSET + new_container_size
2007-10-31 00:10:40 -07:00
if new_extra_lease_offset < old_extra_lease_offset:
# TODO: allow containers to shrink. For now they remain large.
2007-10-30 19:47:36 -07:00
2007-10-31 00:10:40 -07:00
num_extra_leases = self._read_num_extra_leases(f)
extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
2007-10-30 19:47:36 -07:00
# an interrupt here will corrupt the leases, iff the move caused the
# extra leases to overlap.
2007-10-31 00:10:40 -07:00
self._write_extra_lease_offset(f, new_extra_lease_offset)
2007-10-30 19:47:36 -07:00
2007-10-31 00:10:40 -07:00
def _write_share_data(self, f, offset, data):
2007-10-30 19:47:36 -07:00
length = len(data)
precondition(offset >= 0)
2007-10-31 00:10:40 -07:00
data_length = self._read_data_length(f)
extra_lease_offset = self._read_extra_lease_offset(f)
if offset+length >= data_length:
# They are expanding their data size.
if self.DATA_OFFSET+offset+length > extra_lease_offset:
# Their new data won't fit in the current container, so we
# have to move the leases. With luck, they're expanding it
# more than the size of the extra lease block, which will
# minimize the corrupt-the-share window
self._change_container_size(f, offset+length)
extra_lease_offset = self._read_extra_lease_offset(f)
# an interrupt here is ok.. the container has been enlarged
# but the data remains untouched
assert self.DATA_OFFSET+offset+length <= extra_lease_offset
# Their data now fits in the current container. We must write
# their new data and modify the recorded data size.
new_data_length = offset+length
self._write_data_length(f, new_data_length)
2007-10-30 19:47:36 -07:00
# an interrupt here will result in a corrupted share
2007-10-31 00:10:40 -07:00
# now all that's left to do is write out their data
2007-10-30 19:47:36 -07:00
def _write_lease_record(self, f, lease_number, lease_info):
(ownerid, expiration_time,
renew_secret, cancel_secret, nodeid) = lease_info
2007-10-31 00:10:40 -07:00
extra_lease_offset = self._read_extra_lease_offset(f)
num_extra_leases = self._read_num_extra_leases(f)
2007-10-30 19:47:36 -07:00
if lease_number < 4:
offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
2007-10-31 00:10:40 -07:00
elif (lease_number-4) < num_extra_leases:
offset = (extra_lease_offset
2007-10-30 19:47:36 -07:00
+ 4
2007-10-31 00:38:30 -07:00
+ (lease_number-4)*self.LEASE_SIZE)
2007-10-30 19:47:36 -07:00
2007-10-31 00:10:40 -07:00
# must add an extra lease record
self._write_num_extra_leases(f, num_extra_leases+1)
offset = (extra_lease_offset
2007-10-30 19:47:36 -07:00
+ 4
2007-10-31 00:38:30 -07:00
+ (lease_number-4)*self.LEASE_SIZE)
2007-10-30 19:47:36 -07:00
assert f.tell() == offset
2007-11-06 18:49:59 -07:00
2007-10-30 19:47:36 -07:00
ownerid, int(expiration_time),
renew_secret, cancel_secret, nodeid))
def _read_lease_record(self, f, lease_number):
# returns a 5-tuple of lease info, or None
2007-10-31 00:10:40 -07:00
extra_lease_offset = self._read_extra_lease_offset(f)
num_extra_leases = self._read_num_extra_leases(f)
2007-10-30 19:47:36 -07:00
if lease_number < 4:
offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
2007-10-31 00:10:40 -07:00
elif (lease_number-4) < num_extra_leases:
offset = (extra_lease_offset
2007-10-30 19:47:36 -07:00
+ 4
2007-10-31 00:38:30 -07:00
+ (lease_number-4)*self.LEASE_SIZE)
2007-10-30 19:47:36 -07:00
raise IndexError("No such lease number %d" % lease_number)
assert f.tell() == offset
data = f.read(self.LEASE_SIZE)
2007-11-06 18:49:59 -07:00
lease_info = struct.unpack(">LL32s32s20s", data)
2007-10-30 19:47:36 -07:00
(ownerid, expiration_time,
renew_secret, cancel_secret, nodeid) = lease_info
if ownerid == 0:
return None
return lease_info
2007-10-31 00:10:40 -07:00
def _get_num_lease_slots(self, f):
# how many places do we have allocated for leases? Not all of them
# are filled.
num_extra_leases = self._read_num_extra_leases(f)
return 4+num_extra_leases
2007-10-30 19:47:36 -07:00
2007-10-31 00:10:40 -07:00
def _get_first_empty_lease_slot(self, f):
# return an int with the index of an empty slot, or None if we do not
# currently have an empty slot
for i in range(self._get_num_lease_slots(f)):
if self._read_lease_record(f, i) is None:
return i
return None
2007-10-30 19:47:36 -07:00
2007-10-31 12:07:47 -07:00
def _enumerate_leases(self, f):
2007-10-30 19:47:36 -07:00
"""Yields (leasenum, (ownerid, expiration_time, renew_secret,
cancel_secret, accepting_nodeid)) for all leases."""
2007-10-31 00:10:40 -07:00
for i in range(self._get_num_lease_slots(f)):
2007-10-30 19:47:36 -07:00
data = self._read_lease_record(f, i)
if data is not None:
yield (i,data)
except IndexError:
2007-10-31 12:31:33 -07:00
def debug_get_leases(self):
2007-10-31 12:07:47 -07:00
f = open(self.home, 'rb')
leases = list(self._enumerate_leases(f))
return leases
2007-10-30 19:47:36 -07:00
def add_lease(self, lease_info):
f = open(self.home, 'rb+')
2007-10-31 00:10:40 -07:00
num_lease_slots = self._get_num_lease_slots(f)
empty_slot = self._get_first_empty_lease_slot(f)
if empty_slot is not None:
self._write_lease_record(f, empty_slot, lease_info)
self._write_lease_record(f, num_lease_slots, lease_info)
2007-10-30 19:47:36 -07:00
def renew_lease(self, renew_secret, new_expire_time):
accepting_nodeids = set()
f = open(self.home, 'rb+')
2007-10-31 12:07:47 -07:00
for (leasenum,(oid,et,rs,cs,anid)) in self._enumerate_leases(f):
2007-10-30 19:47:36 -07:00
if rs == renew_secret:
# yup. See if we need to update the owner time.
if new_expire_time > et:
# yes
new_lease = (oid,new_expire_time,rs,cs,anid)
self._write_lease_record(f, leasenum, new_lease)
2007-10-31 00:10:40 -07:00
# Return the accepting_nodeids set, to give the client a chance to
# update the leases on a share which has been migrated from its
2007-10-30 19:47:36 -07:00
# original server to a new one.
2007-10-31 00:10:40 -07:00
msg = ("Unable to renew non-existent lease. I have leases accepted by"
" nodeids: ")
2007-11-06 18:49:59 -07:00
msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
2007-10-31 00:10:40 -07:00
for anid in accepting_nodeids])
msg += " ."
raise IndexError(msg)
2007-10-30 19:47:36 -07:00
def add_or_renew_lease(self, lease_info):
ownerid, expire_time, renew_secret, cancel_secret, anid = lease_info
self.renew_lease(renew_secret, expire_time)
except IndexError:
def cancel_lease(self, cancel_secret):
"""Remove any leases with the given cancel_secret. Return
(num_remaining_leases, space_freed). Raise IndexError if there was no
lease with the given cancel_secret."""
2007-10-31 00:10:40 -07:00
accepting_nodeids = set()
2007-10-30 19:47:36 -07:00
modified = 0
remaining = 0
2007-11-06 18:49:59 -07:00
blank_lease = (0, 0, "\x00"*32, "\x00"*32, "\x00"*20)
2007-10-30 19:47:36 -07:00
f = open(self.home, 'rb+')
2007-10-31 12:07:47 -07:00
for (leasenum,(oid,et,rs,cs,anid)) in self._enumerate_leases(f):
2007-10-31 00:10:40 -07:00
2007-10-30 19:47:36 -07:00
if cs == cancel_secret:
self._write_lease_record(f, leasenum, blank_lease)
modified += 1
remaining += 1
if modified:
freed_space = self._pack_leases(f)
2007-10-31 00:10:40 -07:00
2007-10-31 01:31:56 -07:00
return (remaining, freed_space)
2007-10-31 00:10:40 -07:00
msg = ("Unable to cancel non-existent lease. I have leases "
"accepted by nodeids: ")
2007-11-06 18:49:59 -07:00
msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
2007-10-31 00:10:40 -07:00
for anid in accepting_nodeids])
msg += " ."
raise IndexError(msg)
2007-10-30 19:47:36 -07:00
def _pack_leases(self, f):
# TODO: reclaim space from cancelled leases
return 0
2007-10-31 00:10:40 -07:00
def _read_write_enabler_and_nodeid(self, f):
data = f.read(self.HEADER_SIZE)
write_enabler_nodeid, write_enabler,
data_length, extra_least_offset) = \
2007-11-06 18:49:59 -07:00
struct.unpack(">32s20s32sQQ", data)
2007-10-31 00:10:40 -07:00
assert magic == self.MAGIC
return (write_enabler, write_enabler_nodeid)
2007-11-05 00:37:01 -07:00
def readv(self, readv):
datav = []
f = open(self.home, 'rb')
for (offset, length) in readv:
datav.append(self._read_share_data(f, offset, length))
return datav
2007-11-05 20:17:14 -07:00
# def remote_get_length(self):
# f = open(self.home, 'rb')
# data_length = self._read_data_length(f)
# f.close()
# return data_length
2007-10-31 00:10:40 -07:00
2008-01-14 11:58:58 -07:00
def check_write_enabler(self, write_enabler, si_s):
2007-10-31 00:10:40 -07:00
f = open(self.home, 'rb+')
(real_write_enabler, write_enabler_nodeid) = \
2007-11-05 20:17:14 -07:00
2007-10-31 00:10:40 -07:00
if write_enabler != real_write_enabler:
2007-10-30 19:47:36 -07:00
# accomodate share migration by reporting the nodeid used for the
# old write enabler.
2008-01-14 11:58:58 -07:00
self.log(format="bad write enabler on SI %(si)s,"
" recorded by nodeid %(nodeid)s",
si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
2007-10-30 19:47:36 -07:00
msg = "The write enabler was recorded by nodeid '%s'." % \
2007-11-06 18:49:59 -07:00
2007-10-31 00:10:40 -07:00
raise BadWriteEnablerError(msg)
2007-11-05 20:17:14 -07:00
def check_testv(self, testv):
test_good = True
f = open(self.home, 'rb+')
2007-10-30 19:47:36 -07:00
for (offset, length, operator, specimen) in testv:
2007-10-31 00:10:40 -07:00
data = self._read_share_data(f, offset, length)
2007-12-05 00:20:34 -07:00
if not testv_compare(data, operator, specimen):
2007-11-05 20:17:14 -07:00
test_good = False
return test_good
def writev(self, datav, new_length):
f = open(self.home, 'rb+')
2007-10-30 19:47:36 -07:00
for (offset, data) in datav:
2007-10-31 00:10:40 -07:00
self._write_share_data(f, offset, data)
2007-10-30 19:47:36 -07:00
if new_length is not None:
2007-10-31 00:10:40 -07:00
self._change_container_size(f, new_length)
2007-10-30 19:47:36 -07:00
2007-10-31 00:10:40 -07:00
f.write(struct.pack(">Q", new_length))
2007-10-30 19:47:36 -07:00
2007-12-05 00:20:34 -07:00
def testv_compare(a, op, b):
assert op in ("lt", "le", "eq", "ne", "ge", "gt")
if op == "lt":
return a < b
if op == "le":
return a <= b
if op == "eq":
return a == b
if op == "ne":
return a != b
if op == "ge":
return a >= b
if op == "gt":
return a > b
# never reached
2007-11-05 20:17:14 -07:00
class EmptyShare:
def check_testv(self, testv):
test_good = True
for (offset, length, operator, specimen) in testv:
data = ""
2007-12-05 00:20:34 -07:00
if not testv_compare(data, operator, specimen):
2007-11-05 20:17:14 -07:00
test_good = False
return test_good
2008-01-14 11:58:58 -07:00
def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
ms = MutableShareFile(filename, parent)
2007-10-30 19:47:36 -07:00
ms.create(my_nodeid, write_enabler)
del ms
2008-01-14 11:58:58 -07:00
return MutableShareFile(filename, parent)
2007-10-30 19:47:36 -07:00
2006-11-30 20:14:23 -07:00
class StorageServer(service.MultiService, Referenceable):
2008-01-31 21:10:15 -07:00
implements(RIStorageServer, IStatsProducer)
2008-02-05 20:28:59 -07:00
name = 'storage'
2006-11-30 20:14:23 -07:00
2008-03-13 09:10:11 -07:00
OLDEST_SUPPORTED_VERSION = LooseVersion("0.8.0")
2008-02-05 13:05:13 -07:00
def __init__(self, storedir, sizelimit=None,
discard_storage=False, readonly_storage=False,
2007-07-03 17:08:02 -07:00
2007-03-29 20:19:52 -07:00
self.storedir = storedir
2007-08-27 23:41:40 -07:00
sharedir = os.path.join(storedir, "shares")
self.sharedir = sharedir
2007-07-03 17:08:02 -07:00
self.sizelimit = sizelimit
2008-02-05 13:05:13 -07:00
self.no_storage = discard_storage
self.readonly_storage = readonly_storage
stats: add a simple stats gathering system
We have a desire to collect runtime statistics from multiple nodes primarily
for server monitoring purposes. This implements a simple implementation of
such a system, as a skeleton to build more sophistication upon.
Each client now looks for a 'stats_gatherer.furl' config file. If it has
been configured to use a stats gatherer, then it instantiates internally
a StatsProvider. This is a central place for code which wishes to offer
stats up for monitoring to report them to, either by calling
stats_provider.count('stat.name', value) to increment a counter, or by
registering a class as a stats producer with sp.register_producer(obj).
The StatsProvider connects to the StatsGatherer server and provides its
provider upon startup. The StatsGatherer is then responsible for polling
the attached providers periodically to retrieve the data provided.
The provider queries each registered producer when the gatherer queries
the provider. Both the internal 'counters' and the queried 'stats' are
then reported to the gatherer.
This provides a simple gatherer app, (c.f. make stats-gatherer-run)
which prints its furl and listens for incoming connections. Once a
minute, the gatherer polls all connected providers, and writes the
retrieved data into a pickle file.
Also included is a munin plugin which knows how to read the gatherer's
stats.pickle and output data munin can interpret. this plugin,
tahoe-stats.py can be symlinked as multiple different names within
munin's 'plugins' directory, and inspects argv to determine which
data to display, doing a lookup in a table within that file.
It looks in the environment for 'statsfile' to determine the path to
the gatherer's stats.pickle. An example plugins-conf.d file is
2008-01-30 20:11:07 -07:00
self.stats_provider = stats_provider
if self.stats_provider:
2007-08-27 23:41:40 -07:00
self.incomingdir = os.path.join(sharedir, 'incoming')
2007-03-30 10:52:19 -07:00
2007-07-03 17:08:02 -07:00
self._active_writers = weakref.WeakKeyDictionary()
2008-02-05 13:05:13 -07:00
lp = log.msg("StorageServer created, now measuring space..",
2008-04-08 11:36:56 -07:00
self.consumed = None
if self.sizelimit:
self.consumed = fileutil.du(self.sharedir)
log.msg(format="space measurement done, consumed=%(consumed)d bytes",
parent=lp, facility="tahoe.storage")
2006-11-30 20:14:23 -07:00
2008-01-14 11:58:58 -07:00
def log(self, *args, **kwargs):
2008-02-05 13:05:13 -07:00
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.storage"
return log.msg(*args, **kwargs)
2007-11-07 14:14:54 -07:00
2007-10-31 00:10:40 -07:00
def setNodeID(self, nodeid):
# somebody must set this before any slots can be created or leases
# added
self.my_nodeid = nodeid
def startService(self):
if self.parent:
nodeid = self.parent.nodeid # 20 bytes, binary
assert len(nodeid) == 20
2007-11-06 18:49:59 -07:00
2007-10-31 00:10:40 -07:00
2007-03-30 10:52:19 -07:00
def _clean_incomplete(self):
2007-03-29 20:19:52 -07:00
stats: add a simple stats gathering system
We have a desire to collect runtime statistics from multiple nodes primarily
for server monitoring purposes. This implements a simple implementation of
such a system, as a skeleton to build more sophistication upon.
Each client now looks for a 'stats_gatherer.furl' config file. If it has
been configured to use a stats gatherer, then it instantiates internally
a StatsProvider. This is a central place for code which wishes to offer
stats up for monitoring to report them to, either by calling
stats_provider.count('stat.name', value) to increment a counter, or by
registering a class as a stats producer with sp.register_producer(obj).
The StatsProvider connects to the StatsGatherer server and provides its
provider upon startup. The StatsGatherer is then responsible for polling
the attached providers periodically to retrieve the data provided.
The provider queries each registered producer when the gatherer queries
the provider. Both the internal 'counters' and the queried 'stats' are
then reported to the gatherer.
This provides a simple gatherer app, (c.f. make stats-gatherer-run)
which prints its furl and listens for incoming connections. Once a
minute, the gatherer polls all connected providers, and writes the
retrieved data into a pickle file.
Also included is a munin plugin which knows how to read the gatherer's
stats.pickle and output data munin can interpret. this plugin,
tahoe-stats.py can be symlinked as multiple different names within
munin's 'plugins' directory, and inspects argv to determine which
data to display, doing a lookup in a table within that file.
It looks in the environment for 'statsfile' to determine the path to
the gatherer's stats.pickle. An example plugins-conf.d file is
2008-01-30 20:11:07 -07:00
def get_stats(self):
2008-04-09 18:23:06 -07:00
stats = { 'storage_server.allocated': self.allocated_size(), }
if self.consumed is not None:
stats['storage_server.consumed'] = self.consumed
return stats
stats: add a simple stats gathering system
We have a desire to collect runtime statistics from multiple nodes primarily
for server monitoring purposes. This implements a simple implementation of
such a system, as a skeleton to build more sophistication upon.
Each client now looks for a 'stats_gatherer.furl' config file. If it has
been configured to use a stats gatherer, then it instantiates internally
a StatsProvider. This is a central place for code which wishes to offer
stats up for monitoring to report them to, either by calling
stats_provider.count('stat.name', value) to increment a counter, or by
registering a class as a stats producer with sp.register_producer(obj).
The StatsProvider connects to the StatsGatherer server and provides its
provider upon startup. The StatsGatherer is then responsible for polling
the attached providers periodically to retrieve the data provided.
The provider queries each registered producer when the gatherer queries
the provider. Both the internal 'counters' and the queried 'stats' are
then reported to the gatherer.
This provides a simple gatherer app, (c.f. make stats-gatherer-run)
which prints its furl and listens for incoming connections. Once a
minute, the gatherer polls all connected providers, and writes the
retrieved data into a pickle file.
Also included is a munin plugin which knows how to read the gatherer's
stats.pickle and output data munin can interpret. this plugin,
tahoe-stats.py can be symlinked as multiple different names within
munin's 'plugins' directory, and inspects argv to determine which
data to display, doing a lookup in a table within that file.
It looks in the environment for 'statsfile' to determine the path to
the gatherer's stats.pickle. An example plugins-conf.d file is
2008-01-30 20:11:07 -07:00
2007-07-03 17:08:02 -07:00
def allocated_size(self):
2008-04-08 11:36:56 -07:00
space = self.consumed or 0
2007-07-03 17:08:02 -07:00
for bw in self._active_writers:
space += bw.allocated_size()
return space
2008-02-05 13:05:13 -07:00
def remote_get_versions(self):
return (str(allmydata.__version__), str(self.OLDEST_SUPPORTED_VERSION))
2007-08-27 17:28:51 -07:00
def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
2007-09-02 14:47:15 -07:00
canary, owner_num=0):
# owner_num is not for clients to set, but rather it should be
# curried into the PersonalStorageServer instance that is dedicated
# to a particular owner.
2007-03-30 10:52:19 -07:00
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
2008-01-31 17:48:48 -07:00
si_dir = storage_index_to_dir(storage_index)
2008-03-27 17:33:58 -07:00
si_s = si_b2a(storage_index)
log.msg("storage: allocate_buckets %s" % si_s)
2007-09-02 14:47:15 -07:00
# in this implementation, the lease information (including secrets)
# goes into the share files themselves. It could also be put into a
# separate database. Note that the lease should not be added until
# the BucketWrite has been closed.
expire_time = time.time() + 31*24*60*60
lease_info = (owner_num, renew_secret, cancel_secret, expire_time)
2007-07-13 15:09:01 -07:00
space_per_bucket = allocated_size
2007-07-03 17:08:02 -07:00
no_limits = self.sizelimit is None
yes_limits = not no_limits
if yes_limits:
remaining_space = self.sizelimit - self.allocated_size()
2007-09-17 00:48:40 -07:00
# fill alreadygot with all shares that we have, not just the ones
# they asked about: this will save them a lot of work. Add or update
# leases for all of them: if they want us to hold shares for this
# file, they'll want us to hold leases for this file.
for (shnum, fn) in chain(self._get_bucket_shares(storage_index),
sf = ShareFile(fn)
2008-02-05 13:05:13 -07:00
if self.readonly_storage:
# we won't accept new shares
return alreadygot, bucketwriters
2007-03-30 10:52:19 -07:00
for shnum in sharenums:
2008-01-31 17:48:48 -07:00
incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
2007-03-30 10:52:19 -07:00
if os.path.exists(incominghome) or os.path.exists(finalhome):
2007-09-17 00:48:40 -07:00
# great! we already have it. easy.
2007-07-03 17:08:02 -07:00
elif no_limits or remaining_space >= space_per_bucket:
2007-09-17 00:48:40 -07:00
# ok! we need to create the new share file.
2007-07-03 17:08:02 -07:00
bw = BucketWriter(self, incominghome, finalhome,
2008-01-14 21:22:55 -07:00
space_per_bucket, lease_info, canary)
2007-07-16 18:07:03 -07:00
if self.no_storage:
bw.throw_out_all_data = True
2007-07-03 17:08:02 -07:00
bucketwriters[shnum] = bw
self._active_writers[bw] = 1
if yes_limits:
remaining_space -= space_per_bucket
2007-03-30 10:52:19 -07:00
2007-09-17 00:48:40 -07:00
# bummer! not enough space to accept this bucket
2007-07-03 17:08:02 -07:00
2007-07-13 14:04:49 -07:00
if bucketwriters:
2008-01-31 17:48:48 -07:00
fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
2007-08-27 23:41:40 -07:00
2007-03-30 10:52:19 -07:00
return alreadygot, bucketwriters
2006-11-30 20:14:23 -07:00
2007-08-27 23:41:40 -07:00
def remote_renew_lease(self, storage_index, renew_secret):
2007-09-02 14:47:15 -07:00
new_expire_time = time.time() + 31*24*60*60
found_buckets = False
for shnum, filename in self._get_bucket_shares(storage_index):
found_buckets = True
2007-10-31 01:31:56 -07:00
f = open(filename, 'rb')
header = f.read(32)
if header[:32] == MutableShareFile.MAGIC:
2008-01-14 11:58:58 -07:00
sf = MutableShareFile(filename, self)
2007-10-31 01:31:56 -07:00
# note: if the share has been migrated, the renew_lease()
# call will throw an exception, with information to help the
# client update the lease.
elif header[:4] == struct.pack(">L", 1):
sf = ShareFile(filename)
pass # non-sharefile
2007-09-02 14:47:15 -07:00
sf.renew_lease(renew_secret, new_expire_time)
if not found_buckets:
raise IndexError("no such lease to renew")
2007-08-27 23:41:40 -07:00
def remote_cancel_lease(self, storage_index, cancel_secret):
2008-01-31 16:26:28 -07:00
storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
2007-09-02 14:47:15 -07:00
remaining_files = 0
total_space_freed = 0
found_buckets = False
for shnum, filename in self._get_bucket_shares(storage_index):
# note: if we can't find a lease on one share, we won't bother
# looking in the others. Unless something broke internally
# (perhaps we ran out of disk space while adding a lease), the
# leases on all shares will be identical.
found_buckets = True
2007-10-31 01:31:56 -07:00
f = open(filename, 'rb')
header = f.read(32)
if header[:32] == MutableShareFile.MAGIC:
2008-01-14 11:58:58 -07:00
sf = MutableShareFile(filename, self)
2007-10-31 01:31:56 -07:00
# note: if the share has been migrated, the renew_lease()
# call will throw an exception, with information to help the
# client update the lease.
elif header[:4] == struct.pack(">L", 1):
sf = ShareFile(filename)
pass # non-sharefile
2007-09-02 14:47:15 -07:00
# this raises IndexError if the lease wasn't present
remaining_leases, space_freed = sf.cancel_lease(cancel_secret)
total_space_freed += space_freed
if remaining_leases:
remaining_files += 1
# now remove the sharefile. We'll almost certainly be
# removing the entire directory soon.
filelen = os.stat(filename)[stat.ST_SIZE]
total_space_freed += filelen
if not remaining_files:
2007-10-31 01:44:01 -07:00
2008-04-08 11:36:56 -07:00
if self.consumed is not None:
self.consumed -= total_space_freed
stats: add a simple stats gathering system
We have a desire to collect runtime statistics from multiple nodes primarily
for server monitoring purposes. This implements a simple implementation of
such a system, as a skeleton to build more sophistication upon.
Each client now looks for a 'stats_gatherer.furl' config file. If it has
been configured to use a stats gatherer, then it instantiates internally
a StatsProvider. This is a central place for code which wishes to offer
stats up for monitoring to report them to, either by calling
stats_provider.count('stat.name', value) to increment a counter, or by
registering a class as a stats producer with sp.register_producer(obj).
The StatsProvider connects to the StatsGatherer server and provides its
provider upon startup. The StatsGatherer is then responsible for polling
the attached providers periodically to retrieve the data provided.
The provider queries each registered producer when the gatherer queries
the provider. Both the internal 'counters' and the queried 'stats' are
then reported to the gatherer.
This provides a simple gatherer app, (c.f. make stats-gatherer-run)
which prints its furl and listens for incoming connections. Once a
minute, the gatherer polls all connected providers, and writes the
retrieved data into a pickle file.
Also included is a munin plugin which knows how to read the gatherer's
stats.pickle and output data munin can interpret. this plugin,
tahoe-stats.py can be symlinked as multiple different names within
munin's 'plugins' directory, and inspects argv to determine which
data to display, doing a lookup in a table within that file.
It looks in the environment for 'statsfile' to determine the path to
the gatherer's stats.pickle. An example plugins-conf.d file is
2008-01-30 20:11:07 -07:00
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_freed', total_space_freed)
2007-09-02 14:47:15 -07:00
if not found_buckets:
raise IndexError("no such lease to cancel")
2007-08-27 23:41:40 -07:00
2007-07-03 17:38:49 -07:00
def bucket_writer_closed(self, bw, consumed_size):
2008-04-08 11:36:56 -07:00
if self.consumed is not None:
self.consumed += consumed_size
stats: add a simple stats gathering system
We have a desire to collect runtime statistics from multiple nodes primarily
for server monitoring purposes. This implements a simple implementation of
such a system, as a skeleton to build more sophistication upon.
Each client now looks for a 'stats_gatherer.furl' config file. If it has
been configured to use a stats gatherer, then it instantiates internally
a StatsProvider. This is a central place for code which wishes to offer
stats up for monitoring to report them to, either by calling
stats_provider.count('stat.name', value) to increment a counter, or by
registering a class as a stats producer with sp.register_producer(obj).
The StatsProvider connects to the StatsGatherer server and provides its
provider upon startup. The StatsGatherer is then responsible for polling
the attached providers periodically to retrieve the data provided.
The provider queries each registered producer when the gatherer queries
the provider. Both the internal 'counters' and the queried 'stats' are
then reported to the gatherer.
This provides a simple gatherer app, (c.f. make stats-gatherer-run)
which prints its furl and listens for incoming connections. Once a
minute, the gatherer polls all connected providers, and writes the
retrieved data into a pickle file.
Also included is a munin plugin which knows how to read the gatherer's
stats.pickle and output data munin can interpret. this plugin,
tahoe-stats.py can be symlinked as multiple different names within
munin's 'plugins' directory, and inspects argv to determine which
data to display, doing a lookup in a table within that file.
It looks in the environment for 'statsfile' to determine the path to
the gatherer's stats.pickle. An example plugins-conf.d file is
2008-01-30 20:11:07 -07:00
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_added', consumed_size)
2007-07-03 17:08:02 -07:00
del self._active_writers[bw]
2007-09-02 14:47:15 -07:00
def _get_bucket_shares(self, storage_index):
"""Return a list of (shnum, pathname) tuples for files that hold
shares for this storage_index. In each tuple, 'shnum' will always be
the integer form of the last component of 'pathname'."""
2008-01-31 16:26:28 -07:00
storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
2007-03-30 17:12:07 -07:00
2007-06-01 18:48:01 -07:00
for f in os.listdir(storagedir):
2007-04-18 07:41:56 -07:00
if NUM_RE.match(f):
2007-09-02 14:47:15 -07:00
filename = os.path.join(storagedir, f)
yield (int(f), filename)
2007-03-30 17:12:07 -07:00
except OSError:
# Commonly caused by there being no buckets at all.
2007-09-17 00:48:40 -07:00
def _get_incoming_shares(self, storage_index):
2008-01-31 16:26:28 -07:00
incomingdir = os.path.join(self.incomingdir, storage_index_to_dir(storage_index))
2007-09-17 00:48:40 -07:00
for f in os.listdir(incomingdir):
if NUM_RE.match(f):
filename = os.path.join(incomingdir, f)
yield (int(f), filename)
except OSError:
2007-09-02 14:47:15 -07:00
def remote_get_buckets(self, storage_index):
2008-03-27 17:33:58 -07:00
si_s = si_b2a(storage_index)
log.msg("storage: get_buckets %s" % si_s)
2007-09-02 14:47:15 -07:00
bucketreaders = {} # k: sharenum, v: BucketReader
for shnum, filename in self._get_bucket_shares(storage_index):
bucketreaders[shnum] = BucketReader(filename)
2007-03-29 20:19:52 -07:00
return bucketreaders
2007-07-08 23:27:46 -07:00
2007-09-02 14:47:15 -07:00
def get_leases(self, storage_index):
"""Provide an iterator that yields all of the leases attached to this
bucket. Each lease is returned as a tuple of (owner_num,
renew_secret, cancel_secret, expiration_time).
This method is not for client use.
# since all shares get the same lease data, we just grab the leases
# from the first share
shnum, filename = self._get_bucket_shares(storage_index).next()
sf = ShareFile(filename)
return sf.iter_leases()
except StopIteration:
return iter([])
2007-11-05 20:17:14 -07:00
def remote_slot_testv_and_readv_and_writev(self, storage_index,
2008-02-12 20:48:37 -07:00
si_s = si_b2a(storage_index)
2008-03-27 17:33:58 -07:00
lp = log.msg("storage: slot_writev %s" % si_s)
2008-01-31 17:48:48 -07:00
si_dir = storage_index_to_dir(storage_index)
2007-11-05 20:17:14 -07:00
(write_enabler, renew_secret, cancel_secret) = secrets
2007-10-31 00:10:40 -07:00
# shares exist if there is a file for them
2008-01-31 17:48:48 -07:00
bucketdir = os.path.join(self.sharedir, si_dir)
2007-11-05 20:17:14 -07:00
shares = {}
if os.path.isdir(bucketdir):
for sharenum_s in os.listdir(bucketdir):
sharenum = int(sharenum_s)
except ValueError:
filename = os.path.join(bucketdir, sharenum_s)
2008-01-14 11:58:58 -07:00
msf = MutableShareFile(filename, self)
msf.check_write_enabler(write_enabler, si_s)
2007-11-05 20:17:14 -07:00
shares[sharenum] = msf
# write_enabler is good for all existing shares.
# Now evaluate test vectors.
testv_is_good = True
for sharenum in test_and_write_vectors:
(testv, datav, new_length) = test_and_write_vectors[sharenum]
if sharenum in shares:
if not shares[sharenum].check_testv(testv):
2007-11-07 14:14:54 -07:00
self.log("testv failed: [%d]: %r" % (sharenum, testv))
2007-11-05 20:17:14 -07:00
testv_is_good = False
# compare the vectors against an empty share, in which all
# reads return empty strings.
if not EmptyShare().check_testv(testv):
2007-11-07 14:14:54 -07:00
self.log("testv failed (empty): [%d] %r" % (sharenum,
2007-11-05 20:17:14 -07:00
testv_is_good = False
# now gather the read vectors, before we do any writes
read_data = {}
for sharenum, share in shares.items():
read_data[sharenum] = share.readv(read_vector)
if testv_is_good:
# now apply the write vectors
for sharenum in test_and_write_vectors:
(testv, datav, new_length) = test_and_write_vectors[sharenum]
if sharenum not in shares:
# allocate a new share
allocated_size = 2000 # arbitrary, really
share = self._allocate_slot_share(bucketdir, secrets,
shares[sharenum] = share
shares[sharenum].writev(datav, new_length)
# and update the leases on all shares
ownerid = 1 # TODO
expire_time = time.time() + 31*24*60*60 # one month
my_nodeid = self.my_nodeid
anid = my_nodeid
lease_info = (ownerid, expire_time, renew_secret, cancel_secret,
for share in shares.values():
# all done
return (testv_is_good, read_data)
def _allocate_slot_share(self, bucketdir, secrets, sharenum,
allocated_size, owner_num=0):
(write_enabler, renew_secret, cancel_secret) = secrets
my_nodeid = self.my_nodeid
filename = os.path.join(bucketdir, "%d" % sharenum)
2008-01-14 11:58:58 -07:00
share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
2007-11-05 20:17:14 -07:00
return share
2007-10-31 00:10:40 -07:00
2007-11-05 20:17:14 -07:00
def remote_slot_readv(self, storage_index, shares, readv):
2008-02-12 20:48:37 -07:00
si_s = si_b2a(storage_index)
2008-01-31 17:48:48 -07:00
lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
facility="tahoe.storage", level=log.OPERATIONAL)
si_dir = storage_index_to_dir(storage_index)
2007-11-05 00:37:01 -07:00
# shares exist if there is a file for them
2008-01-31 17:48:48 -07:00
bucketdir = os.path.join(self.sharedir, si_dir)
2007-11-05 00:37:01 -07:00
if not os.path.isdir(bucketdir):
return {}
datavs = {}
for sharenum_s in os.listdir(bucketdir):
sharenum = int(sharenum_s)
except ValueError:
2007-11-05 20:17:14 -07:00
if sharenum in shares or not shares:
filename = os.path.join(bucketdir, sharenum_s)
2008-01-14 11:58:58 -07:00
msf = MutableShareFile(filename, self)
2007-11-05 20:17:14 -07:00
datavs[sharenum] = msf.readv(readv)
2008-01-31 17:48:48 -07:00
log.msg("returning shares %s" % (datavs.keys(),),
facility="tahoe.storage", level=log.NOISY, parent=lp)
2007-11-05 00:37:01 -07:00
return datavs
2007-09-02 14:47:15 -07:00
2007-10-30 19:47:36 -07:00
# the code before here runs on the storage server, not the client
# the code beyond here runs on the client, not the storage server
2007-07-13 14:04:49 -07:00
Share data is written into a single file. At the start of the file, there is
a series of four-byte big-endian offset values, which indicate where each
section starts. Each offset is measured from the beginning of the file.
2007-09-04 09:00:24 -07:00
0x00: version number (=00 00 00 01)
0x04: segment size
0x08: data size
0x0c: offset of data (=00 00 00 24)
0x10: offset of plaintext_hash_tree
0x14: offset of crypttext_hash_tree
0x18: offset of block_hashes
0x1c: offset of share_hashes
0x20: offset of uri_extension_length + uri_extension
0x24: start of data
2007-07-13 16:38:25 -07:00
? : start of plaintext_hash_tree
? : start of crypttext_hash_tree
? : start of block_hashes
? : start of share_hashes
2007-07-13 14:04:49 -07:00
each share_hash is written as a two-byte (big-endian) hashnum
followed by the 32-byte SHA-256 hash. We only store the hashes
necessary to validate the share hash root
2007-07-13 16:38:25 -07:00
? : start of uri_extension_length (four-byte big-endian value)
? : start of uri_extension
2007-07-13 14:04:49 -07:00
2007-07-13 15:09:01 -07:00
def allocated_size(data_size, num_segments, num_share_hashes,
wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
2008-01-28 18:53:51 -07:00
uri_extension_size, None)
2007-07-13 15:09:01 -07:00
uri_extension_starts_at = wbp._offsets['uri_extension']
return uri_extension_starts_at + 4 + uri_extension_size
2008-06-02 16:57:01 -07:00
class FileTooLargeError(Exception):
2007-07-08 23:27:46 -07:00
class WriteBucketProxy:
2007-07-13 14:04:49 -07:00
def __init__(self, rref, data_size, segment_size, num_segments,
2008-01-28 18:53:51 -07:00
num_share_hashes, uri_extension_size, nodeid):
2007-07-08 23:27:46 -07:00
self._rref = rref
2007-07-13 16:38:25 -07:00
self._data_size = data_size
2007-07-13 14:04:49 -07:00
self._segment_size = segment_size
2007-07-13 15:09:01 -07:00
self._num_segments = num_segments
2008-01-28 18:53:51 -07:00
self._nodeid = nodeid
2007-07-13 14:04:49 -07:00
2008-06-02 16:57:01 -07:00
if segment_size >= 2**32 or data_size >= 2**32:
raise FileTooLargeError("This file is too large to be uploaded (data_size).")
2007-07-13 19:30:21 -07:00
effective_segments = mathutil.next_power_of_k(num_segments,2)
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
2007-07-13 14:04:49 -07:00
# how many share hashes are included in each share? This will be
# about ln2(num_shares).
self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
2007-07-13 15:09:01 -07:00
# we commit to not sending a uri extension larger than this
self._uri_extension_size = uri_extension_size
2007-07-13 14:04:49 -07:00
offsets = self._offsets = {}
2007-09-04 09:00:24 -07:00
x = 0x24
2007-07-13 14:04:49 -07:00
offsets['data'] = x
x += data_size
offsets['plaintext_hash_tree'] = x
x += self._segment_hash_size
offsets['crypttext_hash_tree'] = x
x += self._segment_hash_size
offsets['block_hashes'] = x
x += self._segment_hash_size
offsets['share_hashes'] = x
x += self._share_hash_size
offsets['uri_extension'] = x
2008-06-02 16:57:01 -07:00
if x >= 2**32:
raise FileTooLargeError("This file is too large to be uploaded (offsets).")
2007-09-04 09:00:24 -07:00
offset_data = struct.pack(">LLLLLLLLL",
1, # version number
2007-07-13 14:04:49 -07:00
2007-07-13 16:38:25 -07:00
2007-07-13 14:04:49 -07:00
2007-07-13 16:38:25 -07:00
2007-07-13 14:04:49 -07:00
2007-09-04 09:00:24 -07:00
assert len(offset_data) == 0x24
2007-07-13 14:04:49 -07:00
self._offset_data = offset_data
2008-01-28 18:53:51 -07:00
def __repr__(self):
if self._nodeid:
nodeid_s = idlib.nodeid_b2a(self._nodeid)
nodeid_s = "[None]"
return "<allmydata.storage.WriteBucketProxy for node %s>" % nodeid_s
2007-07-13 14:04:49 -07:00
def start(self):
return self._write(0, self._offset_data)
2007-07-08 23:27:46 -07:00
def put_block(self, segmentnum, data):
2007-07-13 14:04:49 -07:00
offset = self._offsets['data'] + segmentnum * self._segment_size
assert offset + len(data) <= self._offsets['uri_extension']
assert isinstance(data, str)
2007-07-13 15:09:01 -07:00
if segmentnum < self._num_segments-1:
precondition(len(data) == self._segment_size,
len(data), self._segment_size)
2007-07-13 14:04:49 -07:00
2007-07-13 16:38:25 -07:00
precondition(len(data) == (self._data_size -
(self._segment_size *
(self._num_segments - 1))),
2007-07-13 15:09:01 -07:00
len(data), self._segment_size)
2007-07-13 14:04:49 -07:00
return self._write(offset, data)
2007-07-08 23:27:46 -07:00
def put_plaintext_hashes(self, hashes):
2007-07-13 14:04:49 -07:00
offset = self._offsets['plaintext_hash_tree']
assert isinstance(hashes, list)
data = "".join(hashes)
2007-07-13 19:30:48 -07:00
precondition(len(data) == self._segment_hash_size,
len(data), self._segment_hash_size)
precondition(offset+len(data) <= self._offsets['crypttext_hash_tree'],
offset, len(data), offset+len(data),
2007-07-13 14:04:49 -07:00
return self._write(offset, data)
2007-07-08 23:27:46 -07:00
def put_crypttext_hashes(self, hashes):
2007-07-13 14:04:49 -07:00
offset = self._offsets['crypttext_hash_tree']
assert isinstance(hashes, list)
data = "".join(hashes)
2007-07-13 19:30:48 -07:00
precondition(len(data) == self._segment_hash_size,
len(data), self._segment_hash_size)
precondition(offset + len(data) <= self._offsets['block_hashes'],
offset, len(data), offset+len(data),
2007-07-13 14:04:49 -07:00
return self._write(offset, data)
2007-07-08 23:27:46 -07:00
def put_block_hashes(self, blockhashes):
2007-07-13 14:04:49 -07:00
offset = self._offsets['block_hashes']
assert isinstance(blockhashes, list)
data = "".join(blockhashes)
2007-07-13 19:30:48 -07:00
precondition(len(data) == self._segment_hash_size,
len(data), self._segment_hash_size)
precondition(offset + len(data) <= self._offsets['share_hashes'],
offset, len(data), offset+len(data),
2007-07-13 14:04:49 -07:00
return self._write(offset, data)
2007-07-08 23:27:46 -07:00
def put_share_hashes(self, sharehashes):
2007-07-13 14:04:49 -07:00
# sharehashes is a list of (index, hash) tuples, so they get stored
# as 2+32=34 bytes each
offset = self._offsets['share_hashes']
assert isinstance(sharehashes, list)
data = "".join([struct.pack(">H", hashnum) + hashvalue
for hashnum,hashvalue in sharehashes])
2007-07-13 15:09:01 -07:00
precondition(len(data) == self._share_hash_size,
len(data), self._share_hash_size)
2007-07-13 19:30:48 -07:00
precondition(offset + len(data) <= self._offsets['uri_extension'],
offset, len(data), offset+len(data),
2007-07-13 14:04:49 -07:00
return self._write(offset, data)
2007-07-08 23:27:46 -07:00
def put_uri_extension(self, data):
2007-07-13 14:04:49 -07:00
offset = self._offsets['uri_extension']
assert isinstance(data, str)
2007-07-13 19:30:48 -07:00
precondition(len(data) <= self._uri_extension_size,
len(data), self._uri_extension_size)
2007-07-13 14:04:49 -07:00
length = struct.pack(">L", len(data))
return self._write(offset, length+data)
def _write(self, offset, data):
# TODO: for small shares, buffer the writes and do just a single call
return self._rref.callRemote("write", offset, data)
2007-07-08 23:27:46 -07:00
def close(self):
return self._rref.callRemote("close")
2008-01-14 21:22:55 -07:00
def abort(self):
return self._rref.callRemote("abort")
2007-07-08 23:27:46 -07:00
class ReadBucketProxy:
2008-03-03 19:19:21 -07:00
def __init__(self, rref, peerid=None, storage_index_s=None):
2007-07-08 23:27:46 -07:00
self._rref = rref
2008-03-03 19:19:21 -07:00
self._peerid = peerid
2008-02-26 17:33:14 -07:00
self._si_s = storage_index_s
2007-07-13 15:09:01 -07:00
self._started = False
2007-07-08 23:27:46 -07:00
2008-03-03 20:30:35 -07:00
def get_peerid(self):
return self._peerid
2008-02-26 17:33:14 -07:00
def __repr__(self):
2008-03-03 19:19:21 -07:00
peerid_s = idlib.shortnodeid_b2a(self._peerid)
return "<ReadBucketProxy to peer [%s] SI %s>" % (peerid_s,
2008-02-26 17:33:14 -07:00
2007-07-13 14:04:49 -07:00
def startIfNecessary(self):
if self._started:
return defer.succeed(self)
d = self.start()
d.addCallback(lambda res: self)
return d
def start(self):
# TODO: for small shares, read the whole bucket in start()
2007-09-04 09:00:24 -07:00
d = self._read(0, 0x24)
2007-07-13 16:52:17 -07:00
2007-07-13 14:04:49 -07:00
return d
2007-07-13 16:52:17 -07:00
def _parse_offsets(self, data):
2007-09-04 09:00:24 -07:00
precondition(len(data) == 0x24)
2007-07-13 16:52:17 -07:00
self._offsets = {}
2007-09-04 09:00:24 -07:00
(version, self._segment_size, self._data_size) = \
struct.unpack(">LLL", data[0:0xc])
_assert(version == 1)
x = 0x0c
2007-07-13 16:52:17 -07:00
for field in ( 'data',
offset = struct.unpack(">L", data[x:x+4])[0]
x += 4
self._offsets[field] = offset
return self._offsets
2007-07-08 23:27:46 -07:00
def get_block(self, blocknum):
2007-07-13 16:38:25 -07:00
num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
if blocknum < num_segments-1:
size = self._segment_size
size = self._data_size % self._segment_size
if size == 0:
size = self._segment_size
2007-07-13 14:04:49 -07:00
offset = self._offsets['data'] + blocknum * self._segment_size
2007-07-13 16:38:25 -07:00
return self._read(offset, size)
2007-07-13 14:04:49 -07:00
def _str2l(self, s):
""" split string (pulled from storage) into a list of blockids """
2007-07-13 16:38:25 -07:00
return [ s[i:i+HASH_SIZE]
for i in range(0, len(s), HASH_SIZE) ]
2007-07-08 23:27:46 -07:00
def get_plaintext_hashes(self):
2007-07-13 14:04:49 -07:00
offset = self._offsets['plaintext_hash_tree']
size = self._offsets['crypttext_hash_tree'] - offset
d = self._read(offset, size)
return d
2007-07-08 23:27:46 -07:00
def get_crypttext_hashes(self):
2007-07-13 14:04:49 -07:00
offset = self._offsets['crypttext_hash_tree']
size = self._offsets['block_hashes'] - offset
d = self._read(offset, size)
return d
2007-07-08 23:27:46 -07:00
def get_block_hashes(self):
2007-07-13 14:04:49 -07:00
offset = self._offsets['block_hashes']
size = self._offsets['share_hashes'] - offset
d = self._read(offset, size)
return d
2007-07-08 23:27:46 -07:00
def get_share_hashes(self):
2007-07-13 14:04:49 -07:00
offset = self._offsets['share_hashes']
size = self._offsets['uri_extension'] - offset
assert size % (2+HASH_SIZE) == 0
d = self._read(offset, size)
def _unpack_share_hashes(data):
assert len(data) == size
hashes = []
for i in range(0, size, 2+HASH_SIZE):
hashnum = struct.unpack(">H", data[i:i+2])[0]
hashvalue = data[i+2:i+2+HASH_SIZE]
hashes.append( (hashnum, hashvalue) )
return hashes
return d
2007-07-08 23:27:46 -07:00
2007-07-13 14:04:49 -07:00
def get_uri_extension(self):
offset = self._offsets['uri_extension']
d = self._read(offset, 4)
def _got_length(data):
length = struct.unpack(">L", data)[0]
return self._read(offset+4, length)
return d
def _read(self, offset, length):
return self._rref.callRemote("read", offset, length)