Merge remote-tracking branch 'origin/master' into 3527.test_create-no-mock

This commit is contained in:
Jean-Paul Calderone 2021-11-30 15:06:17 -05:00
commit c1a2964788
32 changed files with 974 additions and 366 deletions

2
.gitignore vendored
View File

@ -29,7 +29,7 @@ zope.interface-*.egg
.pc
/src/allmydata/test/plugins/dropin.cache
/_trial_temp*
**/_trial_temp*
/tmp*
/*.patch
/dist/

View File

@ -363,11 +363,11 @@ one branch contains all of the share data;
another branch contains all of the lease data;
etc.
Authorization is required for all endpoints.
An ``Authorization`` header in requests is required for all endpoints.
The standard HTTP authorization protocol is used.
The authentication *type* used is ``Tahoe-LAFS``.
The swissnum from the NURL used to locate the storage service is used as the *credentials*.
If credentials are not presented or the swissnum is not associated with a storage service then no storage processing is performed and the request receives an ``UNAUTHORIZED`` response.
If credentials are not presented or the swissnum is not associated with a storage service then no storage processing is performed and the request receives an ``401 UNAUTHORIZED`` response.
General
~~~~~~~
@ -396,17 +396,19 @@ For example::
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Either renew or create a new lease on the bucket addressed by ``storage_index``.
The details of the lease are encoded in the request body.
The renew secret and cancellation secret should be included as ``X-Tahoe-Authorization`` headers.
For example::
{"renew-secret": "abcd", "cancel-secret": "efgh"}
X-Tahoe-Authorization: lease-renew-secret <base64-lease-renew-secret>
X-Tahoe-Authorization: lease-cancel-secret <base64-lease-cancel-secret>
If the ``renew-secret`` value matches an existing lease
If the ``lease-renew-secret`` value matches an existing lease
then the expiration time of that lease will be changed to 31 days after the time of this operation.
If it does not match an existing lease
then a new lease will be created with this ``renew-secret`` which expires 31 days after the time of this operation.
then a new lease will be created with this ``lease-renew-secret`` which expires 31 days after the time of this operation.
``renew-secret`` and ``cancel-secret`` values must be 32 bytes long.
``lease-renew-secret`` and ``lease-cancel-secret`` values must be 32 bytes long.
The server treats them as opaque values.
:ref:`Share Leases` gives details about how the Tahoe-LAFS storage client constructs these values.
@ -423,8 +425,10 @@ In these cases the server takes no action and returns ``NOT FOUND``.
Discussion
``````````
We considered an alternative where ``renew-secret`` and ``cancel-secret`` are placed in query arguments on the request path.
We chose to put these values into the request body to make the URL simpler.
We considered an alternative where ``lease-renew-secret`` and ``lease-cancel-secret`` are placed in query arguments on the request path.
This increases chances of leaking secrets in logs.
Putting the secrets in the body reduces the chances of leaking secrets,
but eventually we chose headers as the least likely information to be logged.
Several behaviors here are blindly copied from the Foolscap-based storage server protocol.
@ -450,14 +454,22 @@ A lease is also created for the shares.
Details of the buckets to create are encoded in the request body.
For example::
{"renew-secret": "efgh", "cancel-secret": "ijkl",
"share-numbers": [1, 7, ...], "allocated-size": 12345}
{"share-numbers": [1, 7, ...], "allocated-size": 12345}
The request must include ``X-Tahoe-Authorization`` HTTP headers that set the various secrets—upload, lease renewal, lease cancellation—that will be later used to authorize various operations.
For example::
X-Tahoe-Authorization: lease-renew-secret <base64-lease-renew-secret>
X-Tahoe-Authorization: lease-cancel-secret <base64-lease-cancel-secret>
X-Tahoe-Authorization: upload-secret <base64-upload-secret>
The response body includes encoded information about the created buckets.
For example::
{"already-have": [1, ...], "allocated": [7, ...]}
The upload secret is an opaque _byte_ string.
Discussion
``````````
@ -482,6 +494,20 @@ The response includes ``already-have`` and ``allocated`` for two reasons:
This might be because a server has become unavailable and a remaining server needs to store more shares for the upload.
It could also just be that the client's preferred servers have changed.
Regarding upload secrets,
the goal is for uploading and aborting (see next sections) to be authenticated by more than just the storage index.
In the future, we may want to generate them in a way that allows resuming/canceling when the client has issues.
In the short term, they can just be a random byte string.
The primary security constraint is that each upload to each server has its own unique upload key,
tied to uploading that particular storage index to this particular server.
Rejected designs for upload secrets:
* Upload secret per share number.
In order to make the secret unguessable by attackers, which includes other servers,
it must contain randomness.
Randomness means there is no need to have a secret per share, since adding share-specific content to randomness doesn't actually make the secret any better.
``PATCH /v1/immutable/:storage_index/:share_number``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@ -498,6 +524,12 @@ If any one of these requests fails then at most 128KiB of upload work needs to b
The server must recognize when all of the data has been received and mark the share as complete
(which it can do because it was informed of the size when the storage index was initialized).
The request must include a ``X-Tahoe-Authorization`` header that includes the upload secret::
X-Tahoe-Authorization: upload-secret <base64-upload-secret>
Responses:
* When a chunk that does not complete the share is successfully uploaded the response is ``OK``.
The response body indicates the range of share data that has yet to be uploaded.
That is::
@ -522,6 +554,10 @@ The server must recognize when all of the data has been received and mark the sh
This cancels an *in-progress* upload.
The request must include a ``X-Tahoe-Authorization`` header that includes the upload secret::
X-Tahoe-Authorization: upload-secret <base64-upload-secret>
The response code:
* When the upload is still in progress and therefore the abort has succeeded,
@ -619,16 +655,16 @@ The first write operation on a mutable storage index creates it
(that is,
there is no separate "create this storage index" operation as there is for the immutable storage index type).
The request body includes the secrets necessary to rewrite to the shares
along with test, read, and write vectors for the operation.
The request must include ``X-Tahoe-Authorization`` headers with write enabler and lease secrets::
X-Tahoe-Authorization: write-enabler <base64-write-enabler-secret>
X-Tahoe-Authorization: lease-cancel-secret <base64-lease-cancel-secret>
X-Tahoe-Authorization: lease-renew-secret <base64-lease-renew-secret>
The request body includes test, read, and write vectors for the operation.
For example::
{
"secrets": {
"write-enabler": "abcd",
"lease-renew": "efgh",
"lease-cancel": "ijkl"
},
"test-write-vectors": {
0: {
"test": [{
@ -694,8 +730,12 @@ Immutable Data
1. Create a bucket for storage index ``AAAAAAAAAAAAAAAA`` to hold two immutable shares, discovering that share ``1`` was already uploaded::
POST /v1/immutable/AAAAAAAAAAAAAAAA
{"renew-secret": "efgh", "cancel-secret": "ijkl",
"share-numbers": [1, 7], "allocated-size": 48}
Authorization: Tahoe-LAFS nurl-swissnum
X-Tahoe-Authorization: lease-renew-secret efgh
X-Tahoe-Authorization: lease-cancel-secret jjkl
X-Tahoe-Authorization: upload-secret xyzf
{"share-numbers": [1, 7], "allocated-size": 48}
200 OK
{"already-have": [1], "allocated": [7]}
@ -703,26 +743,34 @@ Immutable Data
#. Upload the content for immutable share ``7``::
PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
Authorization: Tahoe-LAFS nurl-swissnum
Content-Range: bytes 0-15/48
X-Tahoe-Authorization: upload-secret xyzf
<first 16 bytes of share data>
200 OK
PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
Authorization: Tahoe-LAFS nurl-swissnum
Content-Range: bytes 16-31/48
X-Tahoe-Authorization: upload-secret xyzf
<second 16 bytes of share data>
200 OK
PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
Authorization: Tahoe-LAFS nurl-swissnum
Content-Range: bytes 32-47/48
X-Tahoe-Authorization: upload-secret xyzf
<final 16 bytes of share data>
201 CREATED
#. Download the content of the previously uploaded immutable share ``7``::
GET /v1/immutable/AAAAAAAAAAAAAAAA?share=7&offset=0&size=48
GET /v1/immutable/AAAAAAAAAAAAAAAA?share=7
Authorization: Tahoe-LAFS nurl-swissnum
Range: bytes=0-47
200 OK
<complete 48 bytes of previously uploaded data>
@ -730,7 +778,9 @@ Immutable Data
#. Renew the lease on all immutable shares in bucket ``AAAAAAAAAAAAAAAA``::
PUT /v1/lease/AAAAAAAAAAAAAAAA
{"renew-secret": "efgh", "cancel-secret": "ijkl"}
Authorization: Tahoe-LAFS nurl-swissnum
X-Tahoe-Authorization: lease-cancel-secret jjkl
X-Tahoe-Authorization: lease-renew-secret efgh
204 NO CONTENT
@ -743,12 +793,12 @@ if there is no existing share,
otherwise it will read a byte which won't match `b""`::
POST /v1/mutable/BBBBBBBBBBBBBBBB/read-test-write
Authorization: Tahoe-LAFS nurl-swissnum
X-Tahoe-Authorization: write-enabler abcd
X-Tahoe-Authorization: lease-cancel-secret efgh
X-Tahoe-Authorization: lease-renew-secret ijkl
{
"secrets": {
"write-enabler": "abcd",
"lease-renew": "efgh",
"lease-cancel": "ijkl"
},
"test-write-vectors": {
3: {
"test": [{
@ -775,12 +825,12 @@ otherwise it will read a byte which won't match `b""`::
#. Safely rewrite the contents of a known version of mutable share number ``3`` (or fail)::
POST /v1/mutable/BBBBBBBBBBBBBBBB/read-test-write
Authorization: Tahoe-LAFS nurl-swissnum
X-Tahoe-Authorization: write-enabler abcd
X-Tahoe-Authorization: lease-cancel-secret efgh
X-Tahoe-Authorization: lease-renew-secret ijkl
{
"secrets": {
"write-enabler": "abcd",
"lease-renew": "efgh",
"lease-cancel": "ijkl"
},
"test-write-vectors": {
3: {
"test": [{
@ -807,12 +857,16 @@ otherwise it will read a byte which won't match `b""`::
#. Download the contents of share number ``3``::
GET /v1/mutable/BBBBBBBBBBBBBBBB?share=3&offset=0&size=10
Authorization: Tahoe-LAFS nurl-swissnum
<complete 16 bytes of previously uploaded data>
#. Renew the lease on previously uploaded mutable share in slot ``BBBBBBBBBBBBBBBB``::
PUT /v1/lease/BBBBBBBBBBBBBBBB
{"renew-secret": "efgh", "cancel-secret": "ijkl"}
Authorization: Tahoe-LAFS nurl-swissnum
X-Tahoe-Authorization: lease-cancel-secret efgh
X-Tahoe-Authorization: lease-renew-secret ijkl
204 NO CONTENT

View File

@ -35,6 +35,9 @@ from allmydata.test.common import (
if sys.platform.startswith('win'):
pytest.skip('Skipping Tor tests on Windows', allow_module_level=True)
if PY2:
pytest.skip('Skipping Tor tests on Python 2 because dependencies are hard to come by', allow_module_level=True)
@pytest_twisted.inlineCallbacks
def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl):
yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl)

View File

@ -0,0 +1 @@
If uploading an immutable hasn't had a write for 30 minutes, the storage server will abort the upload.

0
newsfragments/3820.minor Normal file
View File

0
newsfragments/3830.minor Normal file
View File

0
newsfragments/3832.minor Normal file
View File

0
newsfragments/3833.minor Normal file
View File

0
newsfragments/3834.minor Normal file
View File

0
newsfragments/3835.minor Normal file
View File

0
newsfragments/3836.minor Normal file
View File

1
newsfragments/3837.other Normal file
View File

@ -0,0 +1 @@
Tahoe-LAFS no longer runs its Tor integration test suite on Python 2 due to the increased complexity of obtaining compatible versions of necessary dependencies.

0
newsfragments/3838.minor Normal file
View File

0
newsfragments/3842.minor Normal file
View File

View File

@ -141,7 +141,9 @@ def write_introducer(basedir, petname, furl):
"""
if isinstance(furl, bytes):
furl = furl.decode("utf-8")
basedir.child(b"private").child(b"introducers.yaml").setContent(
private = basedir.child(b"private")
private.makedirs(ignoreExistingDirectory=True)
private.child(b"introducers.yaml").setContent(
safe_dump({
"introducers": {
petname: {

View File

@ -15,15 +15,22 @@ try:
except ImportError:
pass
# do not import any allmydata modules at this level. Do that from inside
# individual functions instead.
import struct, time, os, sys
from twisted.python import usage, failure
from twisted.internet import defer
from foolscap.logging import cli as foolscap_cli
from allmydata.scripts.common import BaseOptions
from allmydata.scripts.common import BaseOptions
from allmydata import uri
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import ShareFile
from allmydata.mutable.layout import unpack_share
from allmydata.mutable.layout import MDMFSlotReadProxy
from allmydata.mutable.common import NeedMoreDataError
from allmydata.immutable.layout import ReadBucketProxy
from allmydata.util import base32
from allmydata.util.encodingutil import quote_output
class DumpOptions(BaseOptions):
def getSynopsis(self):
@ -56,13 +63,11 @@ def dump_share(options):
# check the version, to see if we have a mutable or immutable share
print("share filename: %s" % quote_output(options['filename']), file=out)
f = open(options['filename'], "rb")
prefix = f.read(32)
f.close()
if prefix == MutableShareFile.MAGIC:
return dump_mutable_share(options)
# otherwise assume it's immutable
return dump_immutable_share(options)
with open(options['filename'], "rb") as f:
if MutableShareFile.is_valid_header(f.read(32)):
return dump_mutable_share(options)
# otherwise assume it's immutable
return dump_immutable_share(options)
def dump_immutable_share(options):
from allmydata.storage.immutable import ShareFile
@ -170,7 +175,7 @@ def dump_immutable_lease_info(f, out):
leases = list(f.get_leases())
if leases:
for i,lease in enumerate(leases):
when = format_expiration_time(lease.expiration_time)
when = format_expiration_time(lease.get_expiration_time())
print(" Lease #%d: owner=%d, expire in %s" \
% (i, lease.owner_num, when), file=out)
else:
@ -223,7 +228,7 @@ def dump_mutable_share(options):
print(file=out)
print(" Lease #%d:" % leasenum, file=out)
print(" ownerid: %d" % lease.owner_num, file=out)
when = format_expiration_time(lease.expiration_time)
when = format_expiration_time(lease.get_expiration_time())
print(" expires in %s" % when, file=out)
print(" renew_secret: %s" % str(base32.b2a(lease.renew_secret), "utf-8"), file=out)
print(" cancel_secret: %s" % str(base32.b2a(lease.cancel_secret), "utf-8"), file=out)
@ -712,125 +717,122 @@ def call(c, *args, **kwargs):
return results[0]
def describe_share(abs_sharefile, si_s, shnum_s, now, out):
from allmydata import uri
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import ShareFile
from allmydata.mutable.layout import unpack_share
from allmydata.mutable.common import NeedMoreDataError
from allmydata.immutable.layout import ReadBucketProxy
from allmydata.util import base32
from allmydata.util.encodingutil import quote_output
import struct
f = open(abs_sharefile, "rb")
prefix = f.read(32)
if prefix == MutableShareFile.MAGIC:
# mutable share
m = MutableShareFile(abs_sharefile)
WE, nodeid = m._read_write_enabler_and_nodeid(f)
data_length = m._read_data_length(f)
expiration_time = min( [lease.expiration_time
for (i,lease) in m._enumerate_leases(f)] )
expiration = max(0, expiration_time - now)
share_type = "unknown"
f.seek(m.DATA_OFFSET)
version = f.read(1)
if version == b"\x00":
# this slot contains an SMDF share
share_type = "SDMF"
elif version == b"\x01":
share_type = "MDMF"
if share_type == "SDMF":
f.seek(m.DATA_OFFSET)
data = f.read(min(data_length, 2000))
try:
pieces = unpack_share(data)
except NeedMoreDataError as e:
# retry once with the larger size
size = e.needed_bytes
f.seek(m.DATA_OFFSET)
data = f.read(min(data_length, size))
pieces = unpack_share(data)
(seqnum, root_hash, IV, k, N, segsize, datalen,
pubkey, signature, share_hash_chain, block_hash_tree,
share_data, enc_privkey) = pieces
print("SDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
elif share_type == "MDMF":
from allmydata.mutable.layout import MDMFSlotReadProxy
fake_shnum = 0
# TODO: factor this out with dump_MDMF_share()
class ShareDumper(MDMFSlotReadProxy):
def _read(self, readvs, force_remote=False, queue=False):
data = []
for (where,length) in readvs:
f.seek(m.DATA_OFFSET+where)
data.append(f.read(length))
return defer.succeed({fake_shnum: data})
p = ShareDumper(None, "fake-si", fake_shnum)
def extract(func):
stash = []
# these methods return Deferreds, but we happen to know that
# they run synchronously when not actually talking to a
# remote server
d = func()
d.addCallback(stash.append)
return stash[0]
verinfo = extract(p.get_verinfo)
(seqnum, root_hash, salt_to_use, segsize, datalen, k, N, prefix,
offsets) = verinfo
print("MDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
with open(abs_sharefile, "rb") as f:
prefix = f.read(32)
if MutableShareFile.is_valid_header(prefix):
_describe_mutable_share(abs_sharefile, f, now, si_s, out)
elif ShareFile.is_valid_header(prefix):
_describe_immutable_share(abs_sharefile, now, si_s, out)
else:
print("UNKNOWN mutable %s" % quote_output(abs_sharefile), file=out)
print("UNKNOWN really-unknown %s" % quote_output(abs_sharefile), file=out)
elif struct.unpack(">L", prefix[:4]) == (1,):
# immutable
def _describe_mutable_share(abs_sharefile, f, now, si_s, out):
# mutable share
m = MutableShareFile(abs_sharefile)
WE, nodeid = m._read_write_enabler_and_nodeid(f)
data_length = m._read_data_length(f)
expiration_time = min( [lease.get_expiration_time()
for (i,lease) in m._enumerate_leases(f)] )
expiration = max(0, expiration_time - now)
class ImmediateReadBucketProxy(ReadBucketProxy):
def __init__(self, sf):
self.sf = sf
ReadBucketProxy.__init__(self, None, None, "")
def __repr__(self):
return "<ImmediateReadBucketProxy>"
def _read(self, offset, size):
return defer.succeed(sf.read_share_data(offset, size))
share_type = "unknown"
f.seek(m.DATA_OFFSET)
version = f.read(1)
if version == b"\x00":
# this slot contains an SMDF share
share_type = "SDMF"
elif version == b"\x01":
share_type = "MDMF"
# use a ReadBucketProxy to parse the bucket and find the uri extension
sf = ShareFile(abs_sharefile)
bp = ImmediateReadBucketProxy(sf)
if share_type == "SDMF":
f.seek(m.DATA_OFFSET)
expiration_time = min( [lease.expiration_time
for lease in sf.get_leases()] )
expiration = max(0, expiration_time - now)
# Read at least the mutable header length, if possible. If there's
# less data than that in the share, don't try to read more (we won't
# be able to unpack the header in this case but we surely don't want
# to try to unpack bytes *following* the data section as if they were
# header data). Rather than 2000 we could use HEADER_LENGTH from
# allmydata/mutable/layout.py, probably.
data = f.read(min(data_length, 2000))
UEB_data = call(bp.get_uri_extension)
unpacked = uri.unpack_extension_readable(UEB_data)
try:
pieces = unpack_share(data)
except NeedMoreDataError as e:
# retry once with the larger size
size = e.needed_bytes
f.seek(m.DATA_OFFSET)
data = f.read(min(data_length, size))
pieces = unpack_share(data)
(seqnum, root_hash, IV, k, N, segsize, datalen,
pubkey, signature, share_hash_chain, block_hash_tree,
share_data, enc_privkey) = pieces
k = unpacked["needed_shares"]
N = unpacked["total_shares"]
filesize = unpacked["size"]
ueb_hash = unpacked["UEB_hash"]
print("SDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
elif share_type == "MDMF":
fake_shnum = 0
# TODO: factor this out with dump_MDMF_share()
class ShareDumper(MDMFSlotReadProxy):
def _read(self, readvs, force_remote=False, queue=False):
data = []
for (where,length) in readvs:
f.seek(m.DATA_OFFSET+where)
data.append(f.read(length))
return defer.succeed({fake_shnum: data})
print("CHK %s %d/%d %d %s %d %s" % (si_s, k, N, filesize,
str(ueb_hash, "utf-8"), expiration,
quote_output(abs_sharefile)), file=out)
p = ShareDumper(None, "fake-si", fake_shnum)
def extract(func):
stash = []
# these methods return Deferreds, but we happen to know that
# they run synchronously when not actually talking to a
# remote server
d = func()
d.addCallback(stash.append)
return stash[0]
verinfo = extract(p.get_verinfo)
(seqnum, root_hash, salt_to_use, segsize, datalen, k, N, prefix,
offsets) = verinfo
print("MDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
else:
print("UNKNOWN really-unknown %s" % quote_output(abs_sharefile), file=out)
print("UNKNOWN mutable %s" % quote_output(abs_sharefile), file=out)
def _describe_immutable_share(abs_sharefile, now, si_s, out):
class ImmediateReadBucketProxy(ReadBucketProxy):
def __init__(self, sf):
self.sf = sf
ReadBucketProxy.__init__(self, None, None, "")
def __repr__(self):
return "<ImmediateReadBucketProxy>"
def _read(self, offset, size):
return defer.succeed(sf.read_share_data(offset, size))
# use a ReadBucketProxy to parse the bucket and find the uri extension
sf = ShareFile(abs_sharefile)
bp = ImmediateReadBucketProxy(sf)
expiration_time = min(lease.get_expiration_time()
for lease in sf.get_leases())
expiration = max(0, expiration_time - now)
UEB_data = call(bp.get_uri_extension)
unpacked = uri.unpack_extension_readable(UEB_data)
k = unpacked["needed_shares"]
N = unpacked["total_shares"]
filesize = unpacked["size"]
ueb_hash = unpacked["UEB_hash"]
print("CHK %s %d/%d %d %s %d %s" % (si_s, k, N, filesize,
str(ueb_hash, "utf-8"), expiration,
quote_output(abs_sharefile)), file=out)
f.close()
def catalog_shares(options):
from allmydata.util.encodingutil import listdir_unicode, quote_output
@ -933,34 +935,35 @@ def corrupt_share(options):
f.write(d)
f.close()
f = open(fn, "rb")
prefix = f.read(32)
f.close()
if prefix == MutableShareFile.MAGIC:
# mutable
m = MutableShareFile(fn)
f = open(fn, "rb")
f.seek(m.DATA_OFFSET)
data = f.read(2000)
# make sure this slot contains an SMDF share
assert data[0:1] == b"\x00", "non-SDMF mutable shares not supported"
f.close()
with open(fn, "rb") as f:
prefix = f.read(32)
(version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize,
ig_datalen, offsets) = unpack_header(data)
if MutableShareFile.is_valid_header(prefix):
# mutable
m = MutableShareFile(fn)
with open(fn, "rb") as f:
f.seek(m.DATA_OFFSET)
# Read enough data to get a mutable header to unpack.
data = f.read(2000)
# make sure this slot contains an SMDF share
assert data[0:1] == b"\x00", "non-SDMF mutable shares not supported"
f.close()
assert version == 0, "we only handle v0 SDMF files"
start = m.DATA_OFFSET + offsets["share_data"]
end = m.DATA_OFFSET + offsets["enc_privkey"]
flip_bit(start, end)
else:
# otherwise assume it's immutable
f = ShareFile(fn)
bp = ReadBucketProxy(None, None, '')
offsets = bp._parse_offsets(f.read_share_data(0, 0x24))
start = f._data_offset + offsets["data"]
end = f._data_offset + offsets["plaintext_hash_tree"]
flip_bit(start, end)
(version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize,
ig_datalen, offsets) = unpack_header(data)
assert version == 0, "we only handle v0 SDMF files"
start = m.DATA_OFFSET + offsets["share_data"]
end = m.DATA_OFFSET + offsets["enc_privkey"]
flip_bit(start, end)
else:
# otherwise assume it's immutable
f = ShareFile(fn)
bp = ReadBucketProxy(None, None, '')
offsets = bp._parse_offsets(f.read_share_data(0, 0x24))
start = f._data_offset + offsets["data"]
end = f._data_offset + offsets["plaintext_hash_tree"]
flip_bit(start, end)

View File

@ -24,7 +24,6 @@ from allmydata.interfaces import (
)
from allmydata.util import base32, fileutil, log
from allmydata.util.assertutil import precondition
from allmydata.util.hashutil import timing_safe_compare
from allmydata.storage.lease import LeaseInfo
from allmydata.storage.common import UnknownImmutableContainerVersionError
@ -57,6 +56,21 @@ class ShareFile(object):
LEASE_SIZE = struct.calcsize(">L32s32sL")
sharetype = "immutable"
@classmethod
def is_valid_header(cls, header):
# type: (bytes) -> bool
"""
Determine if the given bytes constitute a valid header for this type of
container.
:param header: Some bytes from the beginning of a container.
:return: ``True`` if the bytes could belong to this container,
``False`` otherwise.
"""
(version,) = struct.unpack(">L", header[:4])
return version == 1
def __init__(self, filename, max_size=None, create=False):
""" If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
precondition((max_size is not None) or (not create), max_size, create)
@ -144,7 +158,7 @@ class ShareFile(object):
for i in range(num_leases):
data = f.read(self.LEASE_SIZE)
if data:
yield LeaseInfo().from_immutable_data(data)
yield LeaseInfo.from_immutable_data(data)
def add_lease(self, lease_info):
with open(self.home, 'rb+') as f:
@ -152,13 +166,24 @@ class ShareFile(object):
self._write_lease_record(f, num_leases, lease_info)
self._write_num_leases(f, num_leases+1)
def renew_lease(self, renew_secret, new_expire_time):
def renew_lease(self, renew_secret, new_expire_time, allow_backdate=False):
# type: (bytes, int, bool) -> None
"""
Update the expiration time on an existing lease.
:param allow_backdate: If ``True`` then allow the new expiration time
to be before the current expiration time. Otherwise, make no
change when this is the case.
:raise IndexError: If there is no lease matching the given renew
secret.
"""
for i,lease in enumerate(self.get_leases()):
if timing_safe_compare(lease.renew_secret, renew_secret):
if lease.is_renew_secret(renew_secret):
# yup. See if we need to update the owner time.
if new_expire_time > lease.expiration_time:
if allow_backdate or new_expire_time > lease.get_expiration_time():
# yes
lease.expiration_time = new_expire_time
lease = lease.renew(new_expire_time)
with open(self.home, 'rb+') as f:
self._write_lease_record(f, i, lease)
return
@ -167,7 +192,7 @@ class ShareFile(object):
def add_or_renew_lease(self, lease_info):
try:
self.renew_lease(lease_info.renew_secret,
lease_info.expiration_time)
lease_info.get_expiration_time())
except IndexError:
self.add_lease(lease_info)
@ -183,7 +208,7 @@ class ShareFile(object):
leases = list(self.get_leases())
num_leases_removed = 0
for i,lease in enumerate(leases):
if timing_safe_compare(lease.cancel_secret, cancel_secret):
if lease.is_cancel_secret(cancel_secret):
leases[i] = None
num_leases_removed += 1
if not num_leases_removed:
@ -208,7 +233,7 @@ class ShareFile(object):
@implementer(RIBucketWriter)
class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
def __init__(self, ss, incominghome, finalhome, max_size, lease_info):
def __init__(self, ss, incominghome, finalhome, max_size, lease_info, clock):
self.ss = ss
self.incominghome = incominghome
self.finalhome = finalhome
@ -220,12 +245,16 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
# added by simultaneous uploaders
self._sharefile.add_lease(lease_info)
self._already_written = RangeMap()
self._clock = clock
self._timeout = clock.callLater(30 * 60, self._abort_due_to_timeout)
def allocated_size(self):
return self._max_size
def remote_write(self, offset, data):
start = time.time()
# Delay the timeout, since we received data:
self._timeout.reset(30 * 60)
start = self._clock.seconds()
precondition(not self.closed)
if self.throw_out_all_data:
return
@ -243,12 +272,16 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self._sharefile.write_share_data(offset, data)
self._already_written.set(True, offset, end)
self.ss.add_latency("write", time.time() - start)
self.ss.add_latency("write", self._clock.seconds() - start)
self.ss.count("write")
def remote_close(self):
self.close()
def close(self):
precondition(not self.closed)
start = time.time()
self._timeout.cancel()
start = self._clock.seconds()
fileutil.make_dirs(os.path.dirname(self.finalhome))
fileutil.rename(self.incominghome, self.finalhome)
@ -281,20 +314,28 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
filelen = os.stat(self.finalhome)[stat.ST_SIZE]
self.ss.bucket_writer_closed(self, filelen)
self.ss.add_latency("close", time.time() - start)
self.ss.add_latency("close", self._clock.seconds() - start)
self.ss.count("close")
def disconnected(self):
if not self.closed:
self._abort()
self.abort()
def _abort_due_to_timeout(self):
"""
Called if we run out of time.
"""
log.msg("storage: aborting sharefile %s due to timeout" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL)
self.abort()
def remote_abort(self):
log.msg("storage: aborting sharefile %s" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL)
self._abort()
self.abort()
self.ss.count("abort")
def _abort(self):
def abort(self):
if self.closed:
return
@ -312,6 +353,10 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self.closed = True
self.ss.bucket_writer_closed(self, 0)
# Cancel timeout if it wasn't already cancelled.
if self._timeout.active():
self._timeout.cancel()
@implementer(RIBucketReader)
class BucketReader(Referenceable): # type: ignore # warner/foolscap#78

View File

@ -13,52 +13,132 @@ if PY2:
import struct, time
import attr
from allmydata.util.hashutil import timing_safe_compare
@attr.s(frozen=True)
class LeaseInfo(object):
def __init__(self, owner_num=None, renew_secret=None, cancel_secret=None,
expiration_time=None, nodeid=None):
self.owner_num = owner_num
self.renew_secret = renew_secret
self.cancel_secret = cancel_secret
self.expiration_time = expiration_time
if nodeid is not None:
assert isinstance(nodeid, bytes)
assert len(nodeid) == 20
self.nodeid = nodeid
"""
Represent the details of one lease, a marker which is intended to inform
the storage server how long to store a particular share.
"""
owner_num = attr.ib(default=None)
# Don't put secrets into the default string representation. This makes it
# slightly less likely the secrets will accidentally be leaked to
# someplace they're not meant to be.
renew_secret = attr.ib(default=None, repr=False)
cancel_secret = attr.ib(default=None, repr=False)
_expiration_time = attr.ib(default=None)
nodeid = attr.ib(default=None)
@nodeid.validator
def _validate_nodeid(self, attribute, value):
if value is not None:
if not isinstance(value, bytes):
raise ValueError(
"nodeid value must be bytes, not {!r}".format(value),
)
if len(value) != 20:
raise ValueError(
"nodeid value must be 20 bytes long, not {!r}".format(value),
)
return None
def get_expiration_time(self):
return self.expiration_time
# type: () -> float
"""
Retrieve a POSIX timestamp representing the time at which this lease is
set to expire.
"""
return self._expiration_time
def renew(self, new_expire_time):
# type: (float) -> LeaseInfo
"""
Create a new lease the same as this one but with a new expiration time.
:param new_expire_time: The new expiration time.
:return: The new lease info.
"""
return attr.assoc(
self,
_expiration_time=new_expire_time,
)
def is_renew_secret(self, candidate_secret):
# type: (bytes) -> bool
"""
Check a string to see if it is the correct renew secret.
:return: ``True`` if it is the correct renew secret, ``False``
otherwise.
"""
return timing_safe_compare(self.renew_secret, candidate_secret)
def is_cancel_secret(self, candidate_secret):
# type: (bytes) -> bool
"""
Check a string to see if it is the correct cancel secret.
:return: ``True`` if it is the correct cancel secret, ``False``
otherwise.
"""
return timing_safe_compare(self.cancel_secret, candidate_secret)
def get_grant_renew_time_time(self):
# hack, based upon fixed 31day expiration period
return self.expiration_time - 31*24*60*60
return self._expiration_time - 31*24*60*60
def get_age(self):
return time.time() - self.get_grant_renew_time_time()
def from_immutable_data(self, data):
(self.owner_num,
self.renew_secret,
self.cancel_secret,
self.expiration_time) = struct.unpack(">L32s32sL", data)
self.nodeid = None
return self
@classmethod
def from_immutable_data(cls, data):
"""
Create a new instance from the encoded data given.
:param data: A lease serialized using the immutable-share-file format.
"""
names = [
"owner_num",
"renew_secret",
"cancel_secret",
"expiration_time",
]
values = struct.unpack(">L32s32sL", data)
return cls(nodeid=None, **dict(zip(names, values)))
def to_immutable_data(self):
return struct.pack(">L32s32sL",
self.owner_num,
self.renew_secret, self.cancel_secret,
int(self.expiration_time))
int(self._expiration_time))
def to_mutable_data(self):
return struct.pack(">LL32s32s20s",
self.owner_num,
int(self.expiration_time),
int(self._expiration_time),
self.renew_secret, self.cancel_secret,
self.nodeid)
def from_mutable_data(self, data):
(self.owner_num,
self.expiration_time,
self.renew_secret, self.cancel_secret,
self.nodeid) = struct.unpack(">LL32s32s20s", data)
return self
@classmethod
def from_mutable_data(cls, data):
"""
Create a new instance from the encoded data given.
:param data: A lease serialized using the mutable-share-file format.
"""
names = [
"owner_num",
"expiration_time",
"renew_secret",
"cancel_secret",
"nodeid",
]
values = struct.unpack(">LL32s32s20s", data)
return cls(**dict(zip(names, values)))

View File

@ -67,6 +67,20 @@ class MutableShareFile(object):
MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
# TODO: decide upon a policy for max share size
@classmethod
def is_valid_header(cls, header):
# type: (bytes) -> bool
"""
Determine if the given bytes constitute a valid header for this type of
container.
:param header: Some bytes from the beginning of a container.
:return: ``True`` if the bytes could belong to this container,
``False`` otherwise.
"""
return header.startswith(cls.MAGIC)
def __init__(self, filename, parent=None):
self.home = filename
if os.path.exists(self.home):
@ -77,7 +91,7 @@ class MutableShareFile(object):
write_enabler_nodeid, write_enabler,
data_length, extra_least_offset) = \
struct.unpack(">32s20s32sQQ", data)
if magic != self.MAGIC:
if not self.is_valid_header(data):
msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
(filename, magic, self.MAGIC)
raise UnknownMutableContainerVersionError(msg)
@ -253,7 +267,7 @@ class MutableShareFile(object):
f.seek(offset)
assert f.tell() == offset
data = f.read(self.LEASE_SIZE)
lease_info = LeaseInfo().from_mutable_data(data)
lease_info = LeaseInfo.from_mutable_data(data)
if lease_info.owner_num == 0:
return None
return lease_info
@ -298,15 +312,26 @@ class MutableShareFile(object):
else:
self._write_lease_record(f, num_lease_slots, lease_info)
def renew_lease(self, renew_secret, new_expire_time):
def renew_lease(self, renew_secret, new_expire_time, allow_backdate=False):
# type: (bytes, int, bool) -> None
"""
Update the expiration time on an existing lease.
:param allow_backdate: If ``True`` then allow the new expiration time
to be before the current expiration time. Otherwise, make no
change when this is the case.
:raise IndexError: If there is no lease matching the given renew
secret.
"""
accepting_nodeids = set()
with open(self.home, 'rb+') as f:
for (leasenum,lease) in self._enumerate_leases(f):
if timing_safe_compare(lease.renew_secret, renew_secret):
if lease.is_renew_secret(renew_secret):
# yup. See if we need to update the owner time.
if new_expire_time > lease.expiration_time:
if allow_backdate or new_expire_time > lease.get_expiration_time():
# yes
lease.expiration_time = new_expire_time
lease = lease.renew(new_expire_time)
self._write_lease_record(f, leasenum, lease)
return
accepting_nodeids.add(lease.nodeid)
@ -324,7 +349,7 @@ class MutableShareFile(object):
precondition(lease_info.owner_num != 0) # 0 means "no lease here"
try:
self.renew_lease(lease_info.renew_secret,
lease_info.expiration_time)
lease_info.get_expiration_time())
except IndexError:
self.add_lease(lease_info)
@ -346,7 +371,7 @@ class MutableShareFile(object):
with open(self.home, 'rb+') as f:
for (leasenum,lease) in self._enumerate_leases(f):
accepting_nodeids.add(lease.nodeid)
if timing_safe_compare(lease.cancel_secret, cancel_secret):
if lease.is_cancel_secret(cancel_secret):
self._write_lease_record(f, leasenum, blank_lease)
modified += 1
else:
@ -377,7 +402,7 @@ class MutableShareFile(object):
write_enabler_nodeid, write_enabler,
data_length, extra_least_offset) = \
struct.unpack(">32s20s32sQQ", data)
assert magic == self.MAGIC
assert self.is_valid_header(data)
return (write_enabler, write_enabler_nodeid)
def readv(self, readv):
@ -454,4 +479,3 @@ def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
ms.create(my_nodeid, write_enabler)
del ms
return MutableShareFile(filename, parent)

View File

@ -14,12 +14,13 @@ if PY2:
else:
from typing import Dict
import os, re, struct, time
import os, re
import six
from foolscap.api import Referenceable
from foolscap.ipb import IRemoteReference
from twisted.application import service
from twisted.internet import reactor
from zope.interface import implementer
from allmydata.interfaces import RIStorageServer, IStatsProducer
@ -57,6 +58,9 @@ DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60
@implementer(RIStorageServer, IStatsProducer)
class StorageServer(service.MultiService, Referenceable):
"""
A filesystem-based implementation of ``RIStorageServer``.
"""
name = 'storage'
LeaseCheckerClass = LeaseCheckingCrawler
@ -68,7 +72,7 @@ class StorageServer(service.MultiService, Referenceable):
expiration_override_lease_duration=None,
expiration_cutoff_date=None,
expiration_sharetypes=("mutable", "immutable"),
get_current_time=time.time):
clock=reactor):
service.MultiService.__init__(self)
assert isinstance(nodeid, bytes)
assert len(nodeid) == 20
@ -119,7 +123,7 @@ class StorageServer(service.MultiService, Referenceable):
expiration_cutoff_date,
expiration_sharetypes)
self.lease_checker.setServiceParent(self)
self._get_current_time = get_current_time
self._clock = clock
# Currently being-written Bucketwriters. For Foolscap, lifetime is tied
# to connection: when disconnection happens, the BucketWriters are
@ -132,6 +136,12 @@ class StorageServer(service.MultiService, Referenceable):
# Canaries and disconnect markers for BucketWriters created via Foolscap:
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)]
def stopService(self):
# Cancel any in-progress uploads:
for bw in list(self._bucket_writers.values()):
bw.disconnected()
return service.MultiService.stopService(self)
def __repr__(self):
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
@ -277,14 +287,19 @@ class StorageServer(service.MultiService, Referenceable):
def _allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
owner_num=0):
owner_num=0, renew_leases=True):
"""
Generic bucket allocation API.
:param bool renew_leases: If and only if this is ``True`` then renew a
secret-matching lease on (or, if none match, add a new lease to)
existing shares in this bucket. Any *new* shares are given a new
lease regardless.
"""
# owner_num is not for clients to set, but rather it should be
# curried into the PersonalStorageServer instance that is dedicated
# to a particular owner.
start = self._get_current_time()
start = self._clock.seconds()
self.count("allocate")
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
@ -297,7 +312,7 @@ class StorageServer(service.MultiService, Referenceable):
# goes into the share files themselves. It could also be put into a
# separate database. Note that the lease should not be added until
# the BucketWriter has been closed.
expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(owner_num,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
@ -319,8 +334,9 @@ class StorageServer(service.MultiService, Referenceable):
# file, they'll want us to hold leases for this file.
for (shnum, fn) in self._get_bucket_shares(storage_index):
alreadygot.add(shnum)
sf = ShareFile(fn)
sf.add_or_renew_lease(lease_info)
if renew_leases:
sf = ShareFile(fn)
sf.add_or_renew_lease(lease_info)
for shnum in sharenums:
incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
@ -337,7 +353,8 @@ class StorageServer(service.MultiService, Referenceable):
elif (not limited) or (remaining_space >= max_space_per_bucket):
# ok! we need to create the new share file.
bw = BucketWriter(self, incominghome, finalhome,
max_space_per_bucket, lease_info)
max_space_per_bucket, lease_info,
clock=self._clock)
if self.no_storage:
bw.throw_out_all_data = True
bucketwriters[shnum] = bw
@ -351,7 +368,7 @@ class StorageServer(service.MultiService, Referenceable):
if bucketwriters:
fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
self.add_latency("allocate", self._get_current_time() - start)
self.add_latency("allocate", self._clock.seconds() - start)
return alreadygot, bucketwriters
def remote_allocate_buckets(self, storage_index,
@ -361,7 +378,7 @@ class StorageServer(service.MultiService, Referenceable):
"""Foolscap-specific ``allocate_buckets()`` API."""
alreadygot, bucketwriters = self._allocate_buckets(
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
owner_num=owner_num,
owner_num=owner_num, renew_leases=True,
)
# Abort BucketWriters if disconnection happens.
for bw in bucketwriters.values():
@ -373,12 +390,12 @@ class StorageServer(service.MultiService, Referenceable):
for shnum, filename in self._get_bucket_shares(storage_index):
with open(filename, 'rb') as f:
header = f.read(32)
if header[:32] == MutableShareFile.MAGIC:
if MutableShareFile.is_valid_header(header):
sf = MutableShareFile(filename, self)
# note: if the share has been migrated, the renew_lease()
# call will throw an exception, with information to help the
# client update the lease.
elif header[:4] == struct.pack(">L", 1):
elif ShareFile.is_valid_header(header):
sf = ShareFile(filename)
else:
continue # non-sharefile
@ -386,26 +403,26 @@ class StorageServer(service.MultiService, Referenceable):
def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
owner_num=1):
start = self._get_current_time()
start = self._clock.seconds()
self.count("add-lease")
new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(owner_num,
renew_secret, cancel_secret,
new_expire_time, self.my_nodeid)
for sf in self._iter_share_files(storage_index):
sf.add_or_renew_lease(lease_info)
self.add_latency("add-lease", self._get_current_time() - start)
self.add_latency("add-lease", self._clock.seconds() - start)
return None
def remote_renew_lease(self, storage_index, renew_secret):
start = self._get_current_time()
start = self._clock.seconds()
self.count("renew")
new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
found_buckets = False
for sf in self._iter_share_files(storage_index):
found_buckets = True
sf.renew_lease(renew_secret, new_expire_time)
self.add_latency("renew", self._get_current_time() - start)
self.add_latency("renew", self._clock.seconds() - start)
if not found_buckets:
raise IndexError("no such lease to renew")
@ -432,7 +449,7 @@ class StorageServer(service.MultiService, Referenceable):
pass
def remote_get_buckets(self, storage_index):
start = self._get_current_time()
start = self._clock.seconds()
self.count("get")
si_s = si_b2a(storage_index)
log.msg("storage: get_buckets %r" % si_s)
@ -440,7 +457,7 @@ class StorageServer(service.MultiService, Referenceable):
for shnum, filename in self._get_bucket_shares(storage_index):
bucketreaders[shnum] = BucketReader(self, filename,
storage_index, shnum)
self.add_latency("get", self._get_current_time() - start)
self.add_latency("get", self._clock.seconds() - start)
return bucketreaders
def get_leases(self, storage_index):
@ -579,10 +596,8 @@ class StorageServer(service.MultiService, Referenceable):
else:
if sharenum not in shares:
# allocate a new share
allocated_size = 2000 # arbitrary, really
share = self._allocate_slot_share(bucketdir, secrets,
sharenum,
allocated_size,
owner_num=0)
shares[sharenum] = share
shares[sharenum].writev(datav, new_length)
@ -601,7 +616,7 @@ class StorageServer(service.MultiService, Referenceable):
:return LeaseInfo: Information for a new lease for a share.
"""
ownerid = 1 # TODO
expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(ownerid,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
@ -631,13 +646,15 @@ class StorageServer(service.MultiService, Referenceable):
Read data from shares and conditionally write some data to them.
:param bool renew_leases: If and only if this is ``True`` and the test
vectors pass then shares in this slot will also have an updated
lease applied to them.
vectors pass then shares mentioned in ``test_and_write_vectors``
that still exist after the changes are made will also have a
secret-matching lease renewed (or, if none match, a new lease
added).
See ``allmydata.interfaces.RIStorageServer`` for details about other
parameters and return value.
"""
start = self._get_current_time()
start = self._clock.seconds()
self.count("writev")
si_s = si_b2a(storage_index)
log.msg("storage: slot_writev %r" % si_s)
@ -678,7 +695,7 @@ class StorageServer(service.MultiService, Referenceable):
self._add_or_renew_leases(remaining_shares, lease_info)
# all done
self.add_latency("writev", self._get_current_time() - start)
self.add_latency("writev", self._clock.seconds() - start)
return (testv_is_good, read_data)
def remote_slot_testv_and_readv_and_writev(self, storage_index,
@ -694,7 +711,7 @@ class StorageServer(service.MultiService, Referenceable):
)
def _allocate_slot_share(self, bucketdir, secrets, sharenum,
allocated_size, owner_num=0):
owner_num=0):
(write_enabler, renew_secret, cancel_secret) = secrets
my_nodeid = self.my_nodeid
fileutil.make_dirs(bucketdir)
@ -704,7 +721,7 @@ class StorageServer(service.MultiService, Referenceable):
return share
def remote_slot_readv(self, storage_index, shares, readv):
start = self._get_current_time()
start = self._clock.seconds()
self.count("readv")
si_s = si_b2a(storage_index)
lp = log.msg("storage: slot_readv %r %r" % (si_s, shares),
@ -713,7 +730,7 @@ class StorageServer(service.MultiService, Referenceable):
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_dir)
if not os.path.isdir(bucketdir):
self.add_latency("readv", self._get_current_time() - start)
self.add_latency("readv", self._clock.seconds() - start)
return {}
datavs = {}
for sharenum_s in os.listdir(bucketdir):
@ -727,7 +744,7 @@ class StorageServer(service.MultiService, Referenceable):
datavs[sharenum] = msf.readv(readv)
log.msg("returning shares %s" % (list(datavs.keys()),),
facility="tahoe.storage", level=log.NOISY, parent=lp)
self.add_latency("readv", self._get_current_time() - start)
self.add_latency("readv", self._clock.seconds() - start)
return datavs
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,

View File

@ -17,8 +17,7 @@ from allmydata.storage.immutable import ShareFile
def get_share_file(filename):
with open(filename, "rb") as f:
prefix = f.read(32)
if prefix == MutableShareFile.MAGIC:
if MutableShareFile.is_valid_header(prefix):
return MutableShareFile(filename)
# otherwise assume it's immutable
return ShareFile(filename)

View File

@ -268,8 +268,12 @@ class UseNode(object):
node_config = attr.ib(default=attr.Factory(dict))
config = attr.ib(default=None)
reactor = attr.ib(default=None)
def setUp(self):
self.assigner = SameProcessStreamEndpointAssigner()
self.assigner.setUp()
def format_config_items(config):
return "\n".join(
" = ".join((key, value))
@ -293,6 +297,23 @@ class UseNode(object):
"default",
self.introducer_furl,
)
node_config = self.node_config.copy()
if "tub.port" not in node_config:
if "tub.location" in node_config:
raise ValueError(
"UseNode fixture does not support specifying tub.location "
"without tub.port"
)
# Don't use the normal port auto-assignment logic. It produces
# collisions and makes tests fail spuriously.
tub_location, tub_endpoint = self.assigner.assign(self.reactor)
node_config.update({
"tub.port": tub_endpoint,
"tub.location": tub_location,
})
self.config = config_from_string(
self.basedir.asTextMode().path,
"tub.port",
@ -305,7 +326,7 @@ storage.plugins = {storage_plugin}
{plugin_config_section}
""".format(
storage_plugin=self.storage_plugin,
node_config=format_config_items(self.node_config),
node_config=format_config_items(node_config),
plugin_config_section=plugin_config_section,
)
)
@ -317,7 +338,7 @@ storage.plugins = {storage_plugin}
)
def cleanUp(self):
pass
self.assigner.tearDown()
def getDetails(self):
@ -1069,7 +1090,7 @@ def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False):
def _corrupt_mutable_share_data(data, debug=False):
prefix = data[:32]
assert prefix == MutableShareFile.MAGIC, "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableShareFile.MAGIC)
assert MutableShareFile.is_valid_header(prefix), "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableShareFile.MAGIC)
data_offset = MutableShareFile.DATA_OFFSET
sharetype = data[data_offset:data_offset+1]
assert sharetype == b"\x00", "non-SDMF mutable shares not supported"

View File

@ -672,11 +672,14 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
"""
iv_dir = self.getdir("introducer")
if not os.path.isdir(iv_dir):
_, port_endpoint = self.port_assigner.assign(reactor)
_, web_port_endpoint = self.port_assigner.assign(reactor)
main_location_hint, main_port_endpoint = self.port_assigner.assign(reactor)
introducer_config = (
u"[node]\n"
u"nickname = introducer \N{BLACK SMILING FACE}\n" +
u"web.port = {}\n".format(port_endpoint)
u"web.port = {}\n".format(web_port_endpoint) +
u"tub.port = {}\n".format(main_port_endpoint) +
u"tub.location = {}\n".format(main_location_hint)
).encode("utf-8")
fileutil.make_dirs(iv_dir)
@ -764,13 +767,15 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
def _generate_config(self, which, basedir):
config = {}
except1 = set(range(self.numclients)) - {1}
allclients = set(range(self.numclients))
except1 = allclients - {1}
feature_matrix = {
("client", "nickname"): except1,
# client 1 has to auto-assign an address.
("node", "tub.port"): except1,
("node", "tub.location"): except1,
# Auto-assigning addresses is extremely failure prone and not
# amenable to automated testing in _this_ manner.
("node", "tub.port"): allclients,
("node", "tub.location"): allclients,
# client 0 runs a webserver and a helper
# client 3 runs a webserver but no helper
@ -852,7 +857,13 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# connection-lost code
basedir = FilePath(self.getdir("client%d" % client_num))
basedir.makedirs()
config = "[client]\n"
config = (
"[node]\n"
"tub.location = {}\n"
"tub.port = {}\n"
"[client]\n"
).format(*self.port_assigner.assign(reactor))
if helper_furl:
config += "helper.furl = %s\n" % helper_furl
basedir.child("tahoe.cfg").setContent(config.encode("utf-8"))

View File

@ -25,6 +25,11 @@ if PY2:
from past.builtins import unicode
from six import ensure_text
try:
from typing import Dict, Callable
except ImportError:
pass
import os
from base64 import b32encode
from functools import (
@ -479,6 +484,18 @@ class GridTestMixin(object):
def set_up_grid(self, num_clients=1, num_servers=10,
client_config_hooks={}, oneshare=False):
"""
Create a Tahoe-LAFS storage grid.
:param num_clients: See ``NoNetworkGrid``
:param num_servers: See `NoNetworkGrid``
:param client_config_hooks: See ``NoNetworkGrid``
:param bool oneshare: If ``True`` then the first client node is
configured with ``n == k == happy == 1``.
:return: ``None``
"""
# self.basedir must be set
port_assigner = SameProcessStreamEndpointAssigner()
port_assigner.setUp()
@ -557,6 +574,15 @@ class GridTestMixin(object):
return sorted(shares)
def copy_shares(self, uri):
# type: (bytes) -> Dict[bytes, bytes]
"""
Read all of the share files for the given capability from the storage area
of the storage servers created by ``set_up_grid``.
:param bytes uri: A Tahoe-LAFS data capability.
:return: A ``dict`` mapping share file names to share file contents.
"""
shares = {}
for (shnum, serverid, sharefile) in self.find_uri_shares(uri):
with open(sharefile, "rb") as f:
@ -601,10 +627,15 @@ class GridTestMixin(object):
f.write(corruptdata)
def corrupt_all_shares(self, uri, corruptor, debug=False):
# type: (bytes, Callable[[bytes, bool], bytes], bool) -> None
"""
Apply ``corruptor`` to the contents of all share files associated with a
given capability and replace the share file contents with its result.
"""
for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri):
with open(i_sharefile, "rb") as f:
sharedata = f.read()
corruptdata = corruptor(sharedata, debug=debug)
corruptdata = corruptor(sharedata, debug)
with open(i_sharefile, "wb") as f:
f.write(corruptdata)

View File

@ -89,6 +89,7 @@ from .common import (
UseTestPlugins,
MemoryIntroducerClient,
get_published_announcements,
UseNode,
)
from .matchers import (
MatchesSameElements,
@ -953,13 +954,14 @@ class Run(unittest.TestCase, testutil.StallMixin):
@defer.inlineCallbacks
def test_reloadable(self):
basedir = FilePath("test_client.Run.test_reloadable")
private = basedir.child("private")
private.makedirs()
from twisted.internet import reactor
dummy = "pb://wl74cyahejagspqgy4x5ukrvfnevlknt@127.0.0.1:58889/bogus"
write_introducer(basedir, "someintroducer", dummy)
basedir.child("tahoe.cfg").setContent(BASECONFIG. encode("ascii"))
c1 = yield client.create_client(basedir.path)
fixture = UseNode(None, None, FilePath(self.mktemp()), dummy, reactor=reactor)
fixture.setUp()
self.addCleanup(fixture.cleanUp)
c1 = yield fixture.create_node()
c1.setServiceParent(self.sparent)
# delay to let the service start up completely. I'm not entirely sure
@ -981,7 +983,7 @@ class Run(unittest.TestCase, testutil.StallMixin):
# also change _check_exit_trigger to use it instead of a raw
# reactor.stop, also instrument the shutdown event in an
# attribute that we can check.)
c2 = yield client.create_client(basedir.path)
c2 = yield fixture.create_node()
c2.setServiceParent(self.sparent)
yield c2.disownServiceParent()

View File

@ -14,6 +14,11 @@ if PY2:
# a previous run. This asserts that the current code is capable of decoding
# shares from a previous version.
try:
from typing import Any
except ImportError:
pass
import six
import os
from twisted.trial import unittest
@ -951,12 +956,52 @@ class Corruption(_Base, unittest.TestCase):
self.corrupt_shares_numbered(imm_uri, [2], _corruptor)
def _corrupt_set(self, ign, imm_uri, which, newvalue):
# type: (Any, bytes, int, int) -> None
"""
Replace a single byte share file number 2 for the given capability with a
new byte.
:param imm_uri: Corrupt share number 2 belonging to this capability.
:param which: The byte position to replace.
:param newvalue: The new byte value to set in the share.
"""
log.msg("corrupt %d" % which)
def _corruptor(s, debug=False):
return s[:which] + bchr(newvalue) + s[which+1:]
self.corrupt_shares_numbered(imm_uri, [2], _corruptor)
def test_each_byte(self):
"""
Test share selection behavior of the downloader in the face of certain
kinds of data corruption.
1. upload a small share to the no-network grid
2. read all of the resulting share files out of the no-network storage servers
3. for each of
a. each byte of the share file version field
b. each byte of the immutable share version field
c. each byte of the immutable share data offset field
d. the most significant byte of the block_shares offset field
e. one of the bytes of one of the merkle trees
f. one of the bytes of the share hashes list
i. flip the least significant bit in all of the the share files
ii. perform the download/check/restore process
4. add 2 ** 24 to the share file version number
5. perform the download/check/restore process
6. add 2 ** 24 to the share version number
7. perform the download/check/restore process
The download/check/restore process is:
1. attempt to download the data
2. assert that the recovered plaintext is correct
3. assert that only the "correct" share numbers were used to reconstruct the plaintext
4. restore all of the share files to their pristine condition
"""
# Setting catalog_detection=True performs an exhaustive test of the
# Downloader's response to corruption in the lsb of each byte of the
# 2070-byte share, with two goals: make sure we tolerate all forms of
@ -1145,8 +1190,18 @@ class Corruption(_Base, unittest.TestCase):
return d
def _corrupt_flip_all(self, ign, imm_uri, which):
# type: (Any, bytes, int) -> None
"""
Flip the least significant bit at a given byte position in all share files
for the given capability.
"""
def _corruptor(s, debug=False):
return s[:which] + bchr(ord(s[which:which+1])^0x01) + s[which+1:]
# type: (bytes, bool) -> bytes
before_corruption = s[:which]
after_corruption = s[which+1:]
original_byte = s[which:which+1]
corrupt_byte = bchr(ord(original_byte) ^ 0x01)
return b"".join([before_corruption, corrupt_byte, after_corruption])
self.corrupt_all_shares(imm_uri, _corruptor)
class DownloadV2(_Base, unittest.TestCase):

View File

@ -21,6 +21,7 @@ if PY2:
from random import Random
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import Clock
from foolscap.api import Referenceable, RemoteException
@ -1017,16 +1018,17 @@ class _FoolscapMixin(SystemTestMixin):
self.server = s
break
assert self.server is not None, "Couldn't find StorageServer"
self._current_time = 123456
self.server._get_current_time = self.fake_time
self._clock = Clock()
self._clock.advance(123456)
self.server._clock = self._clock
def fake_time(self):
"""Return the current fake, test-controlled, time."""
return self._current_time
return self._clock.seconds()
def fake_sleep(self, seconds):
"""Advance the fake time by the given number of seconds."""
self._current_time += seconds
self._clock.advance(seconds)
@inlineCallbacks
def tearDown(self):

View File

@ -69,6 +69,8 @@ import allmydata.test.common_util as testutil
from .common import (
ConstantAddresses,
SameProcessStreamEndpointAssigner,
UseNode,
)
def port_numbers():
@ -80,11 +82,10 @@ class LoggingMultiService(service.MultiService):
# see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/2946
def testing_tub(config_data=''):
def testing_tub(reactor, config_data=''):
"""
Creates a 'main' Tub for testing purposes, from config data
"""
from twisted.internet import reactor
basedir = 'dummy_basedir'
config = config_from_string(basedir, 'DEFAULT_PORTNUMFILE_BLANK', config_data)
fileutil.make_dirs(os.path.join(basedir, 'private'))
@ -112,6 +113,9 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
# try to bind the port. We'll use a low-numbered one that's likely to
# conflict with another service to prove it.
self._available_port = 22
self.port_assigner = SameProcessStreamEndpointAssigner()
self.port_assigner.setUp()
self.addCleanup(self.port_assigner.tearDown)
def _test_location(
self,
@ -137,11 +141,23 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
:param local_addresses: If not ``None`` then a list of addresses to
supply to the system under test as local addresses.
"""
from twisted.internet import reactor
basedir = self.mktemp()
create_node_dir(basedir, "testing")
if tub_port is None:
# Always configure a usable tub.port address instead of relying on
# the automatic port assignment. The automatic port assignment is
# prone to collisions and spurious test failures.
_, tub_port = self.port_assigner.assign(reactor)
config_data = "[node]\n"
if tub_port:
config_data += "tub.port = {}\n".format(tub_port)
config_data += "tub.port = {}\n".format(tub_port)
# If they wanted a certain location, go for it. This probably won't
# agree with the tub.port value we set but that only matters if
# anything tries to use this to establish a connection ... which
# nothing in this test suite will.
if tub_location is not None:
config_data += "tub.location = {}\n".format(tub_location)
@ -149,7 +165,7 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
self.patch(iputil, 'get_local_addresses_sync',
lambda: local_addresses)
tub = testing_tub(config_data)
tub = testing_tub(reactor, config_data)
class Foo(object):
pass
@ -431,7 +447,12 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
@defer.inlineCallbacks
def test_logdir_is_str(self):
basedir = "test_node/test_logdir_is_str"
from twisted.internet import reactor
basedir = FilePath(self.mktemp())
fixture = UseNode(None, None, basedir, "pb://introducer/furl", {}, reactor=reactor)
fixture.setUp()
self.addCleanup(fixture.cleanUp)
ns = Namespace()
ns.called = False
@ -440,8 +461,7 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
self.failUnless(isinstance(logdir, str), logdir)
self.patch(foolscap.logging.log, 'setLogDir', call_setLogDir)
create_node_dir(basedir, "nothing to see here")
yield client.create_client(basedir)
yield fixture.create_node()
self.failUnless(ns.called)
def test_set_config_unescaped_furl_hash(self):

View File

@ -128,7 +128,7 @@ class Bucket(unittest.TestCase):
def test_create(self):
incoming, final = self.make_workdir("test_create")
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*25)
@ -137,7 +137,7 @@ class Bucket(unittest.TestCase):
def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite")
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*7) # last block may be short
@ -155,7 +155,7 @@ class Bucket(unittest.TestCase):
incoming, final = self.make_workdir(
"test_write_past_size_errors-{}".format(i)
)
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
with self.assertRaises(DataTooLargeError):
bw.remote_write(offset, b"a" * length)
@ -174,7 +174,7 @@ class Bucket(unittest.TestCase):
expected_data = b"".join(bchr(i) for i in range(100))
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
bw = BucketWriter(
self, incoming, final, length, self.make_lease(),
self, incoming, final, length, self.make_lease(), Clock()
)
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, expected_data[10:20])
@ -212,7 +212,7 @@ class Bucket(unittest.TestCase):
length = 100
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
bw = BucketWriter(
self, incoming, final, length, self.make_lease(),
self, incoming, final, length, self.make_lease(), Clock()
)
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, b"1" * 10)
@ -285,6 +285,67 @@ class Bucket(unittest.TestCase):
result_of_read = br.remote_read(0, len(share_data)+1)
self.failUnlessEqual(result_of_read, share_data)
def _assert_timeout_only_after_30_minutes(self, clock, bw):
"""
The ``BucketWriter`` times out and is closed after 30 minutes, but not
sooner.
"""
self.assertFalse(bw.closed)
# 29 minutes pass. Everything is fine.
for i in range(29):
clock.advance(60)
self.assertFalse(bw.closed, "Bucket closed after only %d minutes" % (i + 1,))
# After the 30th minute, the bucket is closed due to lack of writes.
clock.advance(60)
self.assertTrue(bw.closed)
def test_bucket_expires_if_no_writes_for_30_minutes(self):
"""
If a ``BucketWriter`` receives no writes for 30 minutes, it is removed.
"""
incoming, final = self.make_workdir("test_bucket_expires")
clock = Clock()
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock)
self._assert_timeout_only_after_30_minutes(clock, bw)
def test_bucket_writes_delay_timeout(self):
"""
So long as the ``BucketWriter`` receives writes, the the removal
timeout is put off.
"""
incoming, final = self.make_workdir("test_bucket_writes_delay_timeout")
clock = Clock()
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock)
# 29 minutes pass, getting close to the timeout...
clock.advance(29 * 60)
# .. but we receive a write! So that should delay the timeout again to
# another 30 minutes.
bw.remote_write(0, b"hello")
self._assert_timeout_only_after_30_minutes(clock, bw)
def test_bucket_closing_cancels_timeout(self):
"""
Closing cancels the ``BucketWriter`` timeout.
"""
incoming, final = self.make_workdir("test_bucket_close_timeout")
clock = Clock()
bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock)
self.assertTrue(clock.getDelayedCalls())
bw.close()
self.assertFalse(clock.getDelayedCalls())
def test_bucket_aborting_cancels_timeout(self):
"""
Closing cancels the ``BucketWriter`` timeout.
"""
incoming, final = self.make_workdir("test_bucket_abort_timeout")
clock = Clock()
bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock)
self.assertTrue(clock.getDelayedCalls())
bw.abort()
self.assertFalse(clock.getDelayedCalls())
class RemoteBucket(object):
def __init__(self, target):
@ -312,7 +373,7 @@ class BucketProxy(unittest.TestCase):
final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp"))
bw = BucketWriter(self, incoming, final, size, self.make_lease())
bw = BucketWriter(self, incoming, final, size, self.make_lease(), Clock())
rb = RemoteBucket(bw)
return bw, rb, final
@ -438,11 +499,13 @@ class Server(unittest.TestCase):
basedir = os.path.join("storage", "Server", name)
return basedir
def create(self, name, reserved_space=0, klass=StorageServer, get_current_time=time.time):
def create(self, name, reserved_space=0, klass=StorageServer, clock=None):
if clock is None:
clock = Clock()
workdir = self.workdir(name)
ss = klass(workdir, b"\x00" * 20, reserved_space=reserved_space,
stats_provider=FakeStatsProvider(),
get_current_time=get_current_time)
clock=clock)
ss.setServiceParent(self.sparent)
return ss
@ -468,14 +531,19 @@ class Server(unittest.TestCase):
sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
self.failUnlessIn(b'available-space', sv1)
def allocate(self, ss, storage_index, sharenums, size, canary=None):
def allocate(self, ss, storage_index, sharenums, size, renew_leases=True):
"""
Call directly into the storage server's allocate_buckets implementation,
skipping the Foolscap layer.
"""
renew_secret = hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret))
cancel_secret = hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))
if not canary:
canary = FakeCanary()
return ss.remote_allocate_buckets(storage_index,
renew_secret, cancel_secret,
sharenums, size, canary)
return ss._allocate_buckets(
storage_index,
renew_secret, cancel_secret,
sharenums, size,
renew_leases=renew_leases,
)
def test_large_share(self):
syslow = platform.system().lower()
@ -554,7 +622,6 @@ class Server(unittest.TestCase):
writer.remote_abort()
self.failUnlessEqual(ss.allocated_size(), 0)
def test_allocate(self):
ss = self.create("test_allocate")
@ -608,6 +675,64 @@ class Server(unittest.TestCase):
for i,wb in writers.items():
wb.remote_abort()
def test_allocate_without_lease_renewal(self):
"""
``StorageServer._allocate_buckets`` does not renew leases on existing
shares if ``renew_leases`` is ``False``.
"""
first_lease = 456
second_lease = 543
storage_index = b"allocate"
clock = Clock()
clock.advance(first_lease)
ss = self.create(
"test_allocate_without_lease_renewal",
clock=clock,
)
# Put a share on there
already, writers = self.allocate(
ss, storage_index, [0], 1, renew_leases=False,
)
(writer,) = writers.values()
writer.remote_write(0, b"x")
writer.remote_close()
# It should have a lease granted at the current time.
shares = dict(ss._get_bucket_shares(storage_index))
self.assertEqual(
[first_lease],
list(
lease.get_grant_renew_time_time()
for lease
in ShareFile(shares[0]).get_leases()
),
)
# Let some time pass so we can tell if the lease on share 0 is
# renewed.
clock.advance(second_lease)
# Put another share on there.
already, writers = self.allocate(
ss, storage_index, [1], 1, renew_leases=False,
)
(writer,) = writers.values()
writer.remote_write(0, b"x")
writer.remote_close()
# The first share's lease expiration time is unchanged.
shares = dict(ss._get_bucket_shares(storage_index))
self.assertEqual(
[first_lease],
list(
lease.get_grant_renew_time_time()
for lease
in ShareFile(shares[0]).get_leases()
),
)
def test_bad_container_version(self):
ss = self.create("test_bad_container_version")
a,w = self.allocate(ss, b"si1", [0], 10)
@ -629,8 +754,17 @@ class Server(unittest.TestCase):
def test_disconnect(self):
# simulate a disconnection
ss = self.create("test_disconnect")
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
canary = FakeCanary()
already,writers = self.allocate(ss, b"disconnect", [0,1,2], 75, canary)
already,writers = ss.remote_allocate_buckets(
b"disconnect",
renew_secret,
cancel_secret,
sharenums=[0,1,2],
allocated_size=75,
canary=canary,
)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
for (f,args,kwargs) in list(canary.disconnectors.values()):
@ -662,8 +796,17 @@ class Server(unittest.TestCase):
# the size we request.
OVERHEAD = 3*4
LEASE_SIZE = 4+32+32+4
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
canary = FakeCanary()
already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary)
already, writers = ss.remote_allocate_buckets(
b"vid1",
renew_secret,
cancel_secret,
sharenums=[0,1,2],
allocated_size=1000,
canary=canary,
)
self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 3000 bytes provisionally
# allocated, allowing only 2000 more to be claimed
@ -696,7 +839,14 @@ class Server(unittest.TestCase):
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
canary3 = FakeCanary()
already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3)
already3, writers3 = ss.remote_allocate_buckets(
b"vid3",
renew_secret,
cancel_secret,
sharenums=list(range(100)),
allocated_size=100,
canary=canary3,
)
self.failUnlessEqual(len(writers3), 39)
self.failUnlessEqual(len(ss._bucket_writers), 39)
@ -755,28 +905,28 @@ class Server(unittest.TestCase):
# Create a bucket:
rs0, cs0 = self.create_bucket_5_shares(ss, b"si0")
leases = list(ss.get_leases(b"si0"))
self.failUnlessEqual(len(leases), 1)
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
(lease,) = ss.get_leases(b"si0")
self.assertTrue(lease.is_renew_secret(rs0))
rs1, cs1 = self.create_bucket_5_shares(ss, b"si1")
# take out a second lease on si1
rs2, cs2 = self.create_bucket_5_shares(ss, b"si1", 5, 0)
leases = list(ss.get_leases(b"si1"))
self.failUnlessEqual(len(leases), 2)
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
(lease1, lease2) = ss.get_leases(b"si1")
self.assertTrue(lease1.is_renew_secret(rs1))
self.assertTrue(lease2.is_renew_secret(rs2))
# and a third lease, using add-lease
rs2a,cs2a = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
ss.remote_add_lease(b"si1", rs2a, cs2a)
leases = list(ss.get_leases(b"si1"))
self.failUnlessEqual(len(leases), 3)
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
(lease1, lease2, lease3) = ss.get_leases(b"si1")
self.assertTrue(lease1.is_renew_secret(rs1))
self.assertTrue(lease2.is_renew_secret(rs2))
self.assertTrue(lease3.is_renew_secret(rs2a))
# add-lease on a missing storage index is silently ignored
self.failUnlessEqual(ss.remote_add_lease(b"si18", b"", b""), None)
self.assertIsNone(ss.remote_add_lease(b"si18", b"", b""))
# check that si0 is readable
readers = ss.remote_get_buckets(b"si0")
@ -830,12 +980,12 @@ class Server(unittest.TestCase):
"""
clock = Clock()
clock.advance(123)
ss = self.create("test_immutable_add_lease_renews", get_current_time=clock.seconds)
ss = self.create("test_immutable_add_lease_renews", clock=clock)
# Start out with single lease created with bucket:
renewal_secret, cancel_secret = self.create_bucket_5_shares(ss, b"si0")
[lease] = ss.get_leases(b"si0")
self.assertEqual(lease.expiration_time, 123 + DEFAULT_RENEWAL_TIME)
self.assertEqual(lease.get_expiration_time(), 123 + DEFAULT_RENEWAL_TIME)
# Time passes:
clock.advance(123456)
@ -843,7 +993,7 @@ class Server(unittest.TestCase):
# Adding a lease with matching renewal secret just renews it:
ss.remote_add_lease(b"si0", renewal_secret, cancel_secret)
[lease] = ss.get_leases(b"si0")
self.assertEqual(lease.expiration_time, 123 + 123456 + DEFAULT_RENEWAL_TIME)
self.assertEqual(lease.get_expiration_time(), 123 + 123456 + DEFAULT_RENEWAL_TIME)
def test_have_shares(self):
"""By default the StorageServer has no shares."""
@ -944,10 +1094,12 @@ class MutableServer(unittest.TestCase):
basedir = os.path.join("storage", "MutableServer", name)
return basedir
def create(self, name, get_current_time=time.time):
def create(self, name, clock=None):
workdir = self.workdir(name)
if clock is None:
clock = Clock()
ss = StorageServer(workdir, b"\x00" * 20,
get_current_time=get_current_time)
clock=clock)
ss.setServiceParent(self.sparent)
return ss
@ -1230,17 +1382,6 @@ class MutableServer(unittest.TestCase):
self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
self.failUnlessEqual(a.nodeid, b.nodeid)
def compare_leases(self, leases_a, leases_b):
self.failUnlessEqual(len(leases_a), len(leases_b))
for i in range(len(leases_a)):
a = leases_a[i]
b = leases_b[i]
self.failUnlessEqual(a.owner_num, b.owner_num)
self.failUnlessEqual(a.renew_secret, b.renew_secret)
self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
self.failUnlessEqual(a.nodeid, b.nodeid)
self.failUnlessEqual(a.expiration_time, b.expiration_time)
def test_leases(self):
ss = self.create("test_leases")
def secrets(n):
@ -1321,11 +1462,11 @@ class MutableServer(unittest.TestCase):
self.failUnlessIn("I have leases accepted by nodeids:", e_s)
self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
self.compare_leases(all_leases, list(s0.get_leases()))
self.assertEqual(all_leases, list(s0.get_leases()))
# reading shares should not modify the timestamp
read(b"si1", [], [(0,200)])
self.compare_leases(all_leases, list(s0.get_leases()))
self.assertEqual(all_leases, list(s0.get_leases()))
write(b"si1", secrets(0),
{0: ([], [(200, b"make me bigger")], None)}, [])
@ -1343,7 +1484,7 @@ class MutableServer(unittest.TestCase):
clock = Clock()
clock.advance(235)
ss = self.create("test_mutable_add_lease_renews",
get_current_time=clock.seconds)
clock=clock)
def secrets(n):
return ( self.write_enabler(b"we1"),
self.renew_secret(b"we1-%d" % n),
@ -1359,7 +1500,7 @@ class MutableServer(unittest.TestCase):
"shares", storage_index_to_dir(b"si1"))
s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
[lease] = s0.get_leases()
self.assertEqual(lease.expiration_time, 235 + DEFAULT_RENEWAL_TIME)
self.assertEqual(lease.get_expiration_time(), 235 + DEFAULT_RENEWAL_TIME)
# Time passes...
clock.advance(835)
@ -1367,7 +1508,7 @@ class MutableServer(unittest.TestCase):
# Adding a lease renews it:
ss.remote_add_lease(b"si1", renew_secret, cancel_secret)
[lease] = s0.get_leases()
self.assertEqual(lease.expiration_time,
self.assertEqual(lease.get_expiration_time(),
235 + 835 + DEFAULT_RENEWAL_TIME)
def test_remove(self):
@ -3039,3 +3180,102 @@ class ShareFileTests(unittest.TestCase):
sf = self.get_sharefile()
with self.assertRaises(IndexError):
sf.cancel_lease(b"garbage")
def test_renew_secret(self):
"""
A lease loaded from a share file can have its renew secret verified.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
expiration_time = 2 ** 31
sf = self.get_sharefile()
lease = LeaseInfo(
owner_num=0,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
expiration_time=expiration_time,
)
sf.add_lease(lease)
(loaded_lease,) = sf.get_leases()
self.assertTrue(loaded_lease.is_renew_secret(renew_secret))
def test_cancel_secret(self):
"""
A lease loaded from a share file can have its cancel secret verified.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
expiration_time = 2 ** 31
sf = self.get_sharefile()
lease = LeaseInfo(
owner_num=0,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
expiration_time=expiration_time,
)
sf.add_lease(lease)
(loaded_lease,) = sf.get_leases()
self.assertTrue(loaded_lease.is_cancel_secret(cancel_secret))
class LeaseInfoTests(unittest.TestCase):
"""
Tests for ``allmydata.storage.lease.LeaseInfo``.
"""
def test_is_renew_secret(self):
"""
``LeaseInfo.is_renew_secret`` returns ``True`` if the value given is the
renew secret.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
lease = LeaseInfo(
owner_num=1,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
)
self.assertTrue(lease.is_renew_secret(renew_secret))
def test_is_not_renew_secret(self):
"""
``LeaseInfo.is_renew_secret`` returns ``False`` if the value given is not
the renew secret.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
lease = LeaseInfo(
owner_num=1,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
)
self.assertFalse(lease.is_renew_secret(cancel_secret))
def test_is_cancel_secret(self):
"""
``LeaseInfo.is_cancel_secret`` returns ``True`` if the value given is the
cancel secret.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
lease = LeaseInfo(
owner_num=1,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
)
self.assertTrue(lease.is_cancel_secret(cancel_secret))
def test_is_not_cancel_secret(self):
"""
``LeaseInfo.is_cancel_secret`` returns ``False`` if the value given is not
the cancel secret.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
lease = LeaseInfo(
owner_num=1,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
)
self.assertFalse(lease.is_cancel_secret(renew_secret))

View File

@ -485,17 +485,7 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin):
return d
def backdate_lease(self, sf, renew_secret, new_expire_time):
# ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
# "renew" a lease with a new_expire_time that is older than what the
# current lease has), so we have to reach inside it.
for i,lease in enumerate(sf.get_leases()):
if lease.renew_secret == renew_secret:
lease.expiration_time = new_expire_time
f = open(sf.home, 'rb+')
sf._write_lease_record(f, i, lease)
f.close()
return
raise IndexError("unable to renew non-existent lease")
sf.renew_lease(renew_secret, new_expire_time, allow_backdate=True)
def test_expire_age(self):
basedir = "storage/LeaseCrawler/expire_age"

View File

@ -23,6 +23,7 @@ from twisted.internet import defer
from allmydata import uri
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import ShareFile
from allmydata.storage.server import si_a2b
from allmydata.immutable import offloaded, upload
from allmydata.immutable.literal import LiteralFileNode
@ -1290,9 +1291,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
# are sharefiles here
filename = os.path.join(dirpath, filenames[0])
# peek at the magic to see if it is a chk share
magic = open(filename, "rb").read(4)
if magic == b'\x00\x00\x00\x01':
break
with open(filename, "rb") as f:
if ShareFile.is_valid_header(f.read(32)):
break
else:
self.fail("unable to find any uri_extension files in %r"
% self.basedir)

View File

@ -83,12 +83,18 @@ def create_introducer_webish(reactor, port_assigner, basedir):
with the node and its webish service.
"""
node.create_node_dir(basedir, "testing")
_, port_endpoint = port_assigner.assign(reactor)
main_tub_location, main_tub_endpoint = port_assigner.assign(reactor)
_, web_port_endpoint = port_assigner.assign(reactor)
with open(join(basedir, "tahoe.cfg"), "w") as f:
f.write(
"[node]\n"
"tub.location = 127.0.0.1:1\n" +
"web.port = {}\n".format(port_endpoint)
"tub.port = {main_tub_endpoint}\n"
"tub.location = {main_tub_location}\n"
"web.port = {web_port_endpoint}\n".format(
main_tub_endpoint=main_tub_endpoint,
main_tub_location=main_tub_location,
web_port_endpoint=web_port_endpoint,
)
)
intro_node = yield create_introducer(basedir)