Merge remote-tracking branch 'origin/master' into 3942.missing-authorization-handling

This commit is contained in:
Jean-Paul Calderone 2022-12-21 05:51:39 -05:00
commit 6b57b03157
21 changed files with 543 additions and 847 deletions

View File

@ -9,7 +9,7 @@ BASIC_DEPS="pip wheel"
# Python packages we need to support the test infrastructure. *Not* packages
# Tahoe-LAFS itself (implementation or test suite) need.
TEST_DEPS="tox codecov"
TEST_DEPS="tox~=3.0 codecov"
# Python packages we need to generate test reports for CI infrastructure.
# *Not* packages Tahoe-LAFS itself (implement or test suite) need.

View File

@ -6,6 +6,16 @@ on:
- "master"
pull_request:
# At the start of each workflow run, GitHub creates a unique
# GITHUB_TOKEN secret to use in the workflow. It is a good idea for
# this GITHUB_TOKEN to have the minimum of permissions. See:
#
# - https://docs.github.com/en/actions/security-guides/automatic-token-authentication
# - https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#permissions
#
permissions:
contents: read
# Control to what degree jobs in this workflow will run concurrently with
# other instances of themselves.
#
@ -53,42 +63,24 @@ jobs:
python-version: "pypy-3.7"
- os: ubuntu-latest
python-version: "pypy-3.8"
steps:
# See https://github.com/actions/checkout. A fetch-depth of 0
# fetches all tags and branches.
- name: Check out Tahoe-LAFS sources
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
# To use pip caching with GitHub Actions in an OS-independent
# manner, we need `pip cache dir` command, which became
# available since pip v20.1+. At the time of writing this,
# GitHub Actions offers pip v20.3.3 for both ubuntu-latest and
# windows-latest, and pip v20.3.1 for macos-latest.
- name: Get pip cache directory
id: pip-cache
run: |
echo "::set-output name=dir::$(pip cache dir)"
# See https://github.com/actions/cache
- name: Use pip cache
uses: actions/cache@v2
with:
path: ${{ steps.pip-cache.outputs.dir }}
key: ${{ runner.os }}-pip-${{ hashFiles('**/setup.py') }}
restore-keys: |
${{ runner.os }}-pip-
cache: 'pip' # caching pip dependencies
- name: Install Python packages
run: |
pip install --upgrade codecov tox tox-gh-actions setuptools
pip install --upgrade codecov "tox<4" tox-gh-actions setuptools
pip list
- name: Display tool versions
@ -98,13 +90,13 @@ jobs:
run: python -m tox
- name: Upload eliot.log
uses: actions/upload-artifact@v1
uses: actions/upload-artifact@v3
with:
name: eliot.log
path: eliot.log
- name: Upload trial log
uses: actions/upload-artifact@v1
uses: actions/upload-artifact@v3
with:
name: test.log
path: _trial_temp/test.log
@ -190,36 +182,24 @@ jobs:
- name: Install Tor [Windows]
if: matrix.os == 'windows-latest'
uses: crazy-max/ghaction-chocolatey@v1
uses: crazy-max/ghaction-chocolatey@v2
with:
args: install tor
- name: Check out Tahoe-LAFS sources
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Get pip cache directory
id: pip-cache
run: |
echo "::set-output name=dir::$(pip cache dir)"
- name: Use pip cache
uses: actions/cache@v2
with:
path: ${{ steps.pip-cache.outputs.dir }}
key: ${{ runner.os }}-pip-${{ hashFiles('**/setup.py') }}
restore-keys: |
${{ runner.os }}-pip-
cache: 'pip' # caching pip dependencies
- name: Install Python packages
run: |
pip install --upgrade tox
pip install --upgrade "tox<4"
pip list
- name: Display tool versions
@ -234,7 +214,7 @@ jobs:
run: tox -e integration
- name: Upload eliot.log in case of failure
uses: actions/upload-artifact@v1
uses: actions/upload-artifact@v3
if: failure()
with:
name: integration.eliot.json
@ -255,31 +235,19 @@ jobs:
steps:
- name: Check out Tahoe-LAFS sources
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Get pip cache directory
id: pip-cache
run: |
echo "::set-output name=dir::$(pip cache dir)"
- name: Use pip cache
uses: actions/cache@v2
with:
path: ${{ steps.pip-cache.outputs.dir }}
key: ${{ runner.os }}-pip-${{ hashFiles('**/setup.py') }}
restore-keys: |
${{ runner.os }}-pip-
cache: 'pip' # caching pip dependencies
- name: Install Python packages
run: |
pip install --upgrade tox
pip install --upgrade "tox<4"
pip list
- name: Display tool versions
@ -293,7 +261,7 @@ jobs:
run: dist/Tahoe-LAFS/tahoe --version
- name: Upload PyInstaller package
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: Tahoe-LAFS-${{ matrix.os }}-Python-${{ matrix.python-version }}
path: dist/Tahoe-LAFS-*-*.*

0
newsfragments/3874.minor Normal file
View File

View File

@ -0,0 +1,5 @@
`tahoe run ...` will now exit when its stdin is closed.
This facilitates subprocess management, specifically cleanup.
When a parent process is running tahoe and exits without time to do "proper" cleanup at least the stdin descriptor will be closed.
Subsequently "tahoe run" notices this and exits.

View File

@ -0,0 +1 @@
Uploading immutables will now better use available bandwidth, which should allow for faster uploads in many cases.

0
newsfragments/3944.minor Normal file
View File

0
newsfragments/3947.minor Normal file
View File

0
newsfragments/3950.minor Normal file
View File

View File

@ -96,7 +96,9 @@ install_requires = [
# an sftp extra in Tahoe-LAFS, there is no point in having one.
# * Twisted 19.10 introduces Site.getContentFile which we use to get
# temporary upload files placed into a per-node temporary directory.
"Twisted[tls,conch] >= 19.10.0",
# * Twisted 22.8.0 added support for coroutine-returning functions in many
# places (mainly via `maybeDeferred`)
"Twisted[tls,conch] >= 22.8.0",
"PyYAML >= 3.11",

View File

@ -262,6 +262,8 @@ class Encoder(object):
d.addCallback(lambda res: self.finish_hashing())
# These calls have to happen in order; layout.py now requires writes to
# be appended to the data written so far.
d.addCallback(lambda res:
self.send_crypttext_hash_tree_to_all_shareholders())
d.addCallback(lambda res: self.send_all_block_hash_trees())

View File

@ -1,21 +1,18 @@
"""
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from __future__ import annotations
import struct
from io import BytesIO
from attrs import define, field
from zope.interface import implementer
from twisted.internet import defer
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
FileTooLargeError, HASH_SIZE
from allmydata.util import mathutil, observer, pipeline, log
from allmydata.util import mathutil, observer, log
from allmydata.util.assertutil import precondition
from allmydata.storage.server import si_b2a
@ -107,19 +104,58 @@ def make_write_bucket_proxy(rref, server,
num_share_hashes, uri_extension_size)
return wbp
@define
class _WriteBuffer:
"""
Queue up small writes to be written in a single batched larger write.
"""
_batch_size: int
_to_write : BytesIO = field(factory=BytesIO)
_written_bytes : int = field(default=0)
def queue_write(self, data: bytes) -> bool:
"""
Queue a write. If the result is ``False``, no further action is needed
for now. If the result is some ``True``, it's time to call ``flush()``
and do a real write.
"""
self._to_write.write(data)
return self.get_queued_bytes() >= self._batch_size
def flush(self) -> tuple[int, bytes]:
"""Return offset and data to be written."""
offset = self._written_bytes
data = self._to_write.getvalue()
self._written_bytes += len(data)
self._to_write = BytesIO()
return (offset, data)
def get_queued_bytes(self) -> int:
"""Return number of queued, unwritten bytes."""
return self._to_write.tell()
def get_total_bytes(self) -> int:
"""Return how many bytes were written or queued in total."""
return self._written_bytes + self.get_queued_bytes()
@implementer(IStorageBucketWriter)
class WriteBucketProxy(object):
"""
Note: The various ``put_`` methods need to be called in the order in which the
bytes will get written.
"""
fieldsize = 4
fieldstruct = ">L"
def __init__(self, rref, server, data_size, block_size, num_segments,
num_share_hashes, uri_extension_size, pipeline_size=50000):
num_share_hashes, uri_extension_size, batch_size=1_000_000):
self._rref = rref
self._server = server
self._data_size = data_size
self._block_size = block_size
self._num_segments = num_segments
self._written_bytes = 0
effective_segments = mathutil.next_power_of_k(num_segments,2)
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
@ -130,11 +166,13 @@ class WriteBucketProxy(object):
self._create_offsets(block_size, data_size)
# k=3, max_segment_size=128KiB gives us a typical segment of 43691
# bytes. Setting the default pipeline_size to 50KB lets us get two
# segments onto the wire but not a third, which would keep the pipe
# filled.
self._pipeline = pipeline.Pipeline(pipeline_size)
# With a ~1MB batch size, max upload speed is 1MB/(round-trip latency)
# assuming the writing code waits for writes to finish, so 20MB/sec if
# latency is 50ms. In the US many people only have 1MB/sec upload speed
# as of 2022 (standard Comcast). For further discussion of how one
# might set batch sizes see
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3787#comment:1.
self._write_buffer = _WriteBuffer(batch_size)
def get_allocated_size(self):
return (self._offsets['uri_extension'] + self.fieldsize +
@ -179,7 +217,7 @@ class WriteBucketProxy(object):
return "<WriteBucketProxy for node %r>" % self._server.get_name()
def put_header(self):
return self._write(0, self._offset_data)
return self._queue_write(0, self._offset_data)
def put_block(self, segmentnum, data):
offset = self._offsets['data'] + segmentnum * self._block_size
@ -193,13 +231,13 @@ class WriteBucketProxy(object):
(self._block_size *
(self._num_segments - 1))),
len(data), self._block_size)
return self._write(offset, data)
return self._queue_write(offset, data)
def put_crypttext_hashes(self, hashes):
# plaintext_hash_tree precedes crypttext_hash_tree. It is not used, and
# so is not explicitly written, but we need to write everything, so
# fill it in with nulls.
d = self._write(self._offsets['plaintext_hash_tree'], b"\x00" * self._segment_hash_size)
d = self._queue_write(self._offsets['plaintext_hash_tree'], b"\x00" * self._segment_hash_size)
d.addCallback(lambda _: self._really_put_crypttext_hashes(hashes))
return d
@ -212,7 +250,7 @@ class WriteBucketProxy(object):
precondition(offset + len(data) <= self._offsets['block_hashes'],
offset, len(data), offset+len(data),
self._offsets['block_hashes'])
return self._write(offset, data)
return self._queue_write(offset, data)
def put_block_hashes(self, blockhashes):
offset = self._offsets['block_hashes']
@ -223,7 +261,7 @@ class WriteBucketProxy(object):
precondition(offset + len(data) <= self._offsets['share_hashes'],
offset, len(data), offset+len(data),
self._offsets['share_hashes'])
return self._write(offset, data)
return self._queue_write(offset, data)
def put_share_hashes(self, sharehashes):
# sharehashes is a list of (index, hash) tuples, so they get stored
@ -237,29 +275,45 @@ class WriteBucketProxy(object):
precondition(offset + len(data) <= self._offsets['uri_extension'],
offset, len(data), offset+len(data),
self._offsets['uri_extension'])
return self._write(offset, data)
return self._queue_write(offset, data)
def put_uri_extension(self, data):
offset = self._offsets['uri_extension']
assert isinstance(data, bytes)
precondition(len(data) == self._uri_extension_size)
length = struct.pack(self.fieldstruct, len(data))
return self._write(offset, length+data)
return self._queue_write(offset, length+data)
def _write(self, offset, data):
# use a Pipeline to pipeline several writes together. TODO: another
# speedup would be to coalesce small writes into a single call: this
# would reduce the foolscap CPU overhead per share, but wouldn't
# reduce the number of round trips, so it might not be worth the
# effort.
self._written_bytes += len(data)
return self._pipeline.add(len(data),
self._rref.callRemote, "write", offset, data)
def _queue_write(self, offset, data):
"""
This queues up small writes to be written in a single batched larger
write.
Callers of this function are expected to queue the data in order, with
no holes. As such, the offset is technically unnecessary, but is used
to check the inputs. Possibly we should get rid of it.
"""
assert offset == self._write_buffer.get_total_bytes()
if self._write_buffer.queue_write(data):
return self._actually_write()
else:
return defer.succeed(False)
def _actually_write(self):
"""Write data to the server."""
offset, data = self._write_buffer.flush()
return self._rref.callRemote("write", offset, data)
def close(self):
assert self._written_bytes == self.get_allocated_size(), f"{self._written_bytes} != {self.get_allocated_size()}"
d = self._pipeline.add(0, self._rref.callRemote, "close")
d.addCallback(lambda ign: self._pipeline.flush())
assert self._write_buffer.get_total_bytes() == self.get_allocated_size(), (
f"{self._written_buffer.get_total_bytes_queued()} != {self.get_allocated_size()}"
)
if self._write_buffer.get_queued_bytes() > 0:
d = self._actually_write()
else:
# No data queued, don't send empty string write.
d = defer.succeed(True)
d.addCallback(lambda _: self._rref.callRemote("close"))
return d
def abort(self):
@ -371,16 +425,16 @@ class ReadBucketProxy(object):
self._fieldsize = fieldsize
self._fieldstruct = fieldstruct
for field in ( 'data',
'plaintext_hash_tree', # UNUSED
'crypttext_hash_tree',
'block_hashes',
'share_hashes',
'uri_extension',
):
for field_name in ( 'data',
'plaintext_hash_tree', # UNUSED
'crypttext_hash_tree',
'block_hashes',
'share_hashes',
'uri_extension',
):
offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0]
x += fieldsize
self._offsets[field] = offset
self._offsets[field_name] = offset
return self._offsets
def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):

View File

@ -21,7 +21,11 @@ from twisted.scripts import twistd
from twisted.python import usage
from twisted.python.filepath import FilePath
from twisted.python.reflect import namedAny
from twisted.internet.defer import maybeDeferred
from twisted.python.failure import Failure
from twisted.internet.defer import maybeDeferred, Deferred
from twisted.internet.protocol import Protocol
from twisted.internet.stdio import StandardIO
from twisted.internet.error import ReactorNotRunning
from twisted.application.service import Service
from allmydata.scripts.default_nodedir import _default_nodedir
@ -155,6 +159,8 @@ class DaemonizeTheRealService(Service, HookMixin):
def startService(self):
from twisted.internet import reactor
def start():
node_to_instance = {
u"client": lambda: maybeDeferred(namedAny("allmydata.client.create_client"), self.basedir),
@ -194,12 +200,14 @@ class DaemonizeTheRealService(Service, HookMixin):
def created(srv):
srv.setServiceParent(self.parent)
# exiting on stdin-closed facilitates cleanup when run
# as a subprocess
on_stdin_close(reactor, reactor.stop)
d.addCallback(created)
d.addErrback(handle_config_error)
d.addBoth(self._call_hook, 'running')
return d
from twisted.internet import reactor
reactor.callWhenRunning(start)
@ -213,6 +221,46 @@ class DaemonizeTahoeNodePlugin(object):
return DaemonizeTheRealService(self.nodetype, self.basedir, so)
def on_stdin_close(reactor, fn):
"""
Arrange for the function `fn` to run when our stdin closes
"""
when_closed_d = Deferred()
class WhenClosed(Protocol):
"""
Notify a Deferred when our connection is lost .. as this is passed
to twisted's StandardIO class, it is used to detect our parent
going away.
"""
def connectionLost(self, reason):
when_closed_d.callback(None)
def on_close(arg):
try:
fn()
except ReactorNotRunning:
pass
except Exception:
# for our "exit" use-case failures will _mostly_ just be
# ReactorNotRunning (because we're already shutting down
# when our stdin closes) but no matter what "bad thing"
# happens we just want to ignore it .. although other
# errors might be interesting so we'll log those
print(Failure())
return arg
when_closed_d.addBoth(on_close)
# we don't need to do anything with this instance because it gets
# hooked into the reactor and thus remembered .. but we return it
# for Windows testing purposes.
return StandardIO(
proto=WhenClosed(),
reactor=reactor,
)
def run(reactor, config, runApp=twistd.runApp):
"""
Runs a Tahoe-LAFS node in the foreground.

View File

@ -1,19 +1,12 @@
"""
Ported to Python 3.
Tests related to the way ``allmydata.mutable`` handles different versions
of data for an object.
"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from io import StringIO
import os
from six.moves import cStringIO as StringIO
from typing import Optional
from twisted.internet import defer
from ..common import AsyncTestCase
from testtools.matchers import (
Equals,
@ -47,343 +40,268 @@ class Version(GridTestMixin, AsyncTestCase, testutil.ShouldFailMixin, \
self.small_data = b"test data" * 10 # 90 B; SDMF
def do_upload_mdmf(self, data=None):
async def do_upload_mdmf(self, data: Optional[bytes] = None) -> MutableFileNode:
if data is None:
data = self.data
d = self.nm.create_mutable_file(MutableData(data),
version=MDMF_VERSION)
def _then(n):
self.assertThat(n, IsInstance(MutableFileNode))
self.assertThat(n._protocol_version, Equals(MDMF_VERSION))
self.mdmf_node = n
return n
d.addCallback(_then)
return d
n = await self.nm.create_mutable_file(MutableData(data),
version=MDMF_VERSION)
self.assertThat(n, IsInstance(MutableFileNode))
self.assertThat(n._protocol_version, Equals(MDMF_VERSION))
self.mdmf_node = n
return n
def do_upload_sdmf(self, data=None):
async def do_upload_sdmf(self, data: Optional[bytes] = None) -> MutableFileNode:
if data is None:
data = self.small_data
d = self.nm.create_mutable_file(MutableData(data))
def _then(n):
self.assertThat(n, IsInstance(MutableFileNode))
self.assertThat(n._protocol_version, Equals(SDMF_VERSION))
self.sdmf_node = n
return n
d.addCallback(_then)
return d
n = await self.nm.create_mutable_file(MutableData(data))
self.assertThat(n, IsInstance(MutableFileNode))
self.assertThat(n._protocol_version, Equals(SDMF_VERSION))
self.sdmf_node = n
return n
def do_upload_empty_sdmf(self):
d = self.nm.create_mutable_file(MutableData(b""))
def _then(n):
self.assertThat(n, IsInstance(MutableFileNode))
self.sdmf_zero_length_node = n
self.assertThat(n._protocol_version, Equals(SDMF_VERSION))
return n
d.addCallback(_then)
return d
async def do_upload_empty_sdmf(self) -> MutableFileNode:
n = await self.nm.create_mutable_file(MutableData(b""))
self.assertThat(n, IsInstance(MutableFileNode))
self.sdmf_zero_length_node = n
self.assertThat(n._protocol_version, Equals(SDMF_VERSION))
return n
def do_upload(self):
d = self.do_upload_mdmf()
d.addCallback(lambda ign: self.do_upload_sdmf())
return d
async def do_upload(self) -> MutableFileNode:
await self.do_upload_mdmf()
return await self.do_upload_sdmf()
def test_debug(self):
d = self.do_upload_mdmf()
def _debug(n):
fso = debug.FindSharesOptions()
storage_index = base32.b2a(n.get_storage_index())
fso.si_s = str(storage_index, "utf-8") # command-line options are unicode on Python 3
fso.nodedirs = [os.path.dirname(abspath_expanduser_unicode(str(storedir)))
for (i,ss,storedir)
in self.iterate_servers()]
fso.stdout = StringIO()
fso.stderr = StringIO()
debug.find_shares(fso)
sharefiles = fso.stdout.getvalue().splitlines()
expected = self.nm.default_encoding_parameters["n"]
self.assertThat(sharefiles, HasLength(expected))
async def test_debug(self) -> None:
n = await self.do_upload_mdmf()
fso = debug.FindSharesOptions()
storage_index = base32.b2a(n.get_storage_index())
fso.si_s = str(storage_index, "utf-8") # command-line options are unicode on Python 3
fso.nodedirs = [os.path.dirname(abspath_expanduser_unicode(str(storedir)))
for (i,ss,storedir)
in self.iterate_servers()]
fso.stdout = StringIO()
fso.stderr = StringIO()
debug.find_shares(fso)
sharefiles = fso.stdout.getvalue().splitlines()
expected = self.nm.default_encoding_parameters["n"]
self.assertThat(sharefiles, HasLength(expected))
do = debug.DumpOptions()
do["filename"] = sharefiles[0]
do.stdout = StringIO()
debug.dump_share(do)
output = do.stdout.getvalue()
lines = set(output.splitlines())
self.assertTrue("Mutable slot found:" in lines, output)
self.assertTrue(" share_type: MDMF" in lines, output)
self.assertTrue(" num_extra_leases: 0" in lines, output)
self.assertTrue(" MDMF contents:" in lines, output)
self.assertTrue(" seqnum: 1" in lines, output)
self.assertTrue(" required_shares: 3" in lines, output)
self.assertTrue(" total_shares: 10" in lines, output)
self.assertTrue(" segsize: 131073" in lines, output)
self.assertTrue(" datalen: %d" % len(self.data) in lines, output)
vcap = str(n.get_verify_cap().to_string(), "utf-8")
self.assertTrue(" verify-cap: %s" % vcap in lines, output)
cso = debug.CatalogSharesOptions()
cso.nodedirs = fso.nodedirs
cso.stdout = StringIO()
cso.stderr = StringIO()
debug.catalog_shares(cso)
shares = cso.stdout.getvalue().splitlines()
oneshare = shares[0] # all shares should be MDMF
self.failIf(oneshare.startswith("UNKNOWN"), oneshare)
self.assertTrue(oneshare.startswith("MDMF"), oneshare)
fields = oneshare.split()
self.assertThat(fields[0], Equals("MDMF"))
self.assertThat(fields[1].encode("ascii"), Equals(storage_index))
self.assertThat(fields[2], Equals("3/10"))
self.assertThat(fields[3], Equals("%d" % len(self.data)))
self.assertTrue(fields[4].startswith("#1:"), fields[3])
# the rest of fields[4] is the roothash, which depends upon
# encryption salts and is not constant. fields[5] is the
# remaining time on the longest lease, which is timing dependent.
# The rest of the line is the quoted pathname to the share.
d.addCallback(_debug)
return d
do = debug.DumpOptions()
do["filename"] = sharefiles[0]
do.stdout = StringIO()
debug.dump_share(do)
output = do.stdout.getvalue()
lines = set(output.splitlines())
self.assertTrue("Mutable slot found:" in lines, output)
self.assertTrue(" share_type: MDMF" in lines, output)
self.assertTrue(" num_extra_leases: 0" in lines, output)
self.assertTrue(" MDMF contents:" in lines, output)
self.assertTrue(" seqnum: 1" in lines, output)
self.assertTrue(" required_shares: 3" in lines, output)
self.assertTrue(" total_shares: 10" in lines, output)
self.assertTrue(" segsize: 131073" in lines, output)
self.assertTrue(" datalen: %d" % len(self.data) in lines, output)
vcap = str(n.get_verify_cap().to_string(), "utf-8")
self.assertTrue(" verify-cap: %s" % vcap in lines, output)
cso = debug.CatalogSharesOptions()
cso.nodedirs = fso.nodedirs
cso.stdout = StringIO()
cso.stderr = StringIO()
debug.catalog_shares(cso)
shares = cso.stdout.getvalue().splitlines()
oneshare = shares[0] # all shares should be MDMF
self.failIf(oneshare.startswith("UNKNOWN"), oneshare)
self.assertTrue(oneshare.startswith("MDMF"), oneshare)
fields = oneshare.split()
self.assertThat(fields[0], Equals("MDMF"))
self.assertThat(fields[1].encode("ascii"), Equals(storage_index))
self.assertThat(fields[2], Equals("3/10"))
self.assertThat(fields[3], Equals("%d" % len(self.data)))
self.assertTrue(fields[4].startswith("#1:"), fields[3])
# the rest of fields[4] is the roothash, which depends upon
# encryption salts and is not constant. fields[5] is the
# remaining time on the longest lease, which is timing dependent.
# The rest of the line is the quoted pathname to the share.
async def test_get_sequence_number(self) -> None:
await self.do_upload()
bv = await self.mdmf_node.get_best_readable_version()
self.assertThat(bv.get_sequence_number(), Equals(1))
bv = await self.sdmf_node.get_best_readable_version()
self.assertThat(bv.get_sequence_number(), Equals(1))
def test_get_sequence_number(self):
d = self.do_upload()
d.addCallback(lambda ign: self.mdmf_node.get_best_readable_version())
d.addCallback(lambda bv:
self.assertThat(bv.get_sequence_number(), Equals(1)))
d.addCallback(lambda ignored:
self.sdmf_node.get_best_readable_version())
d.addCallback(lambda bv:
self.assertThat(bv.get_sequence_number(), Equals(1)))
# Now update. The sequence number in both cases should be 1 in
# both cases.
def _do_update(ignored):
new_data = MutableData(b"foo bar baz" * 100000)
new_small_data = MutableData(b"foo bar baz" * 10)
d1 = self.mdmf_node.overwrite(new_data)
d2 = self.sdmf_node.overwrite(new_small_data)
dl = gatherResults([d1, d2])
return dl
d.addCallback(_do_update)
d.addCallback(lambda ignored:
self.mdmf_node.get_best_readable_version())
d.addCallback(lambda bv:
self.assertThat(bv.get_sequence_number(), Equals(2)))
d.addCallback(lambda ignored:
self.sdmf_node.get_best_readable_version())
d.addCallback(lambda bv:
self.assertThat(bv.get_sequence_number(), Equals(2)))
return d
new_data = MutableData(b"foo bar baz" * 100000)
new_small_data = MutableData(b"foo bar baz" * 10)
d1 = self.mdmf_node.overwrite(new_data)
d2 = self.sdmf_node.overwrite(new_small_data)
await gatherResults([d1, d2])
bv = await self.mdmf_node.get_best_readable_version()
self.assertThat(bv.get_sequence_number(), Equals(2))
bv = await self.sdmf_node.get_best_readable_version()
self.assertThat(bv.get_sequence_number(), Equals(2))
def test_cap_after_upload(self):
async def test_cap_after_upload(self) -> None:
# If we create a new mutable file and upload things to it, and
# it's an MDMF file, we should get an MDMF cap back from that
# file and should be able to use that.
# That's essentially what MDMF node is, so just check that.
d = self.do_upload_mdmf()
def _then(ign):
mdmf_uri = self.mdmf_node.get_uri()
cap = uri.from_string(mdmf_uri)
self.assertTrue(isinstance(cap, uri.WriteableMDMFFileURI))
readonly_mdmf_uri = self.mdmf_node.get_readonly_uri()
cap = uri.from_string(readonly_mdmf_uri)
self.assertTrue(isinstance(cap, uri.ReadonlyMDMFFileURI))
d.addCallback(_then)
return d
await self.do_upload_mdmf()
mdmf_uri = self.mdmf_node.get_uri()
cap = uri.from_string(mdmf_uri)
self.assertTrue(isinstance(cap, uri.WriteableMDMFFileURI))
readonly_mdmf_uri = self.mdmf_node.get_readonly_uri()
cap = uri.from_string(readonly_mdmf_uri)
self.assertTrue(isinstance(cap, uri.ReadonlyMDMFFileURI))
def test_mutable_version(self):
async def test_mutable_version(self) -> None:
# assert that getting parameters from the IMutableVersion object
# gives us the same data as getting them from the filenode itself
d = self.do_upload()
d.addCallback(lambda ign: self.mdmf_node.get_best_mutable_version())
def _check_mdmf(bv):
n = self.mdmf_node
self.assertThat(bv.get_writekey(), Equals(n.get_writekey()))
self.assertThat(bv.get_storage_index(), Equals(n.get_storage_index()))
self.assertFalse(bv.is_readonly())
d.addCallback(_check_mdmf)
d.addCallback(lambda ign: self.sdmf_node.get_best_mutable_version())
def _check_sdmf(bv):
n = self.sdmf_node
self.assertThat(bv.get_writekey(), Equals(n.get_writekey()))
self.assertThat(bv.get_storage_index(), Equals(n.get_storage_index()))
self.assertFalse(bv.is_readonly())
d.addCallback(_check_sdmf)
return d
await self.do_upload()
bv = await self.mdmf_node.get_best_mutable_version()
n = self.mdmf_node
self.assertThat(bv.get_writekey(), Equals(n.get_writekey()))
self.assertThat(bv.get_storage_index(), Equals(n.get_storage_index()))
self.assertFalse(bv.is_readonly())
bv = await self.sdmf_node.get_best_mutable_version()
n = self.sdmf_node
self.assertThat(bv.get_writekey(), Equals(n.get_writekey()))
self.assertThat(bv.get_storage_index(), Equals(n.get_storage_index()))
self.assertFalse(bv.is_readonly())
def test_get_readonly_version(self):
d = self.do_upload()
d.addCallback(lambda ign: self.mdmf_node.get_best_readable_version())
d.addCallback(lambda bv: self.assertTrue(bv.is_readonly()))
async def test_get_readonly_version(self) -> None:
await self.do_upload()
bv = await self.mdmf_node.get_best_readable_version()
self.assertTrue(bv.is_readonly())
# Attempting to get a mutable version of a mutable file from a
# filenode initialized with a readcap should return a readonly
# version of that same node.
d.addCallback(lambda ign: self.mdmf_node.get_readonly())
d.addCallback(lambda ro: ro.get_best_mutable_version())
d.addCallback(lambda v: self.assertTrue(v.is_readonly()))
ro = self.mdmf_node.get_readonly()
v = await ro.get_best_mutable_version()
self.assertTrue(v.is_readonly())
d.addCallback(lambda ign: self.sdmf_node.get_best_readable_version())
d.addCallback(lambda bv: self.assertTrue(bv.is_readonly()))
bv = await self.sdmf_node.get_best_readable_version()
self.assertTrue(bv.is_readonly())
d.addCallback(lambda ign: self.sdmf_node.get_readonly())
d.addCallback(lambda ro: ro.get_best_mutable_version())
d.addCallback(lambda v: self.assertTrue(v.is_readonly()))
return d
ro = self.sdmf_node.get_readonly()
v = await ro.get_best_mutable_version()
self.assertTrue(v.is_readonly())
def test_toplevel_overwrite(self):
async def test_toplevel_overwrite(self) -> None:
new_data = MutableData(b"foo bar baz" * 100000)
new_small_data = MutableData(b"foo bar baz" * 10)
d = self.do_upload()
d.addCallback(lambda ign: self.mdmf_node.overwrite(new_data))
d.addCallback(lambda ignored:
self.mdmf_node.download_best_version())
d.addCallback(lambda data:
self.assertThat(data, Equals(b"foo bar baz" * 100000)))
d.addCallback(lambda ignored:
self.sdmf_node.overwrite(new_small_data))
d.addCallback(lambda ignored:
self.sdmf_node.download_best_version())
d.addCallback(lambda data:
self.assertThat(data, Equals(b"foo bar baz" * 10)))
return d
await self.do_upload()
await self.mdmf_node.overwrite(new_data)
data = await self.mdmf_node.download_best_version()
self.assertThat(data, Equals(b"foo bar baz" * 100000))
await self.sdmf_node.overwrite(new_small_data)
data = await self.sdmf_node.download_best_version()
self.assertThat(data, Equals(b"foo bar baz" * 10))
def test_toplevel_modify(self):
d = self.do_upload()
async def test_toplevel_modify(self) -> None:
await self.do_upload()
def modifier(old_contents, servermap, first_time):
return old_contents + b"modified"
d.addCallback(lambda ign: self.mdmf_node.modify(modifier))
d.addCallback(lambda ignored:
self.mdmf_node.download_best_version())
d.addCallback(lambda data:
self.assertThat(data, Contains(b"modified")))
d.addCallback(lambda ignored:
self.sdmf_node.modify(modifier))
d.addCallback(lambda ignored:
self.sdmf_node.download_best_version())
d.addCallback(lambda data:
self.assertThat(data, Contains(b"modified")))
return d
await self.mdmf_node.modify(modifier)
data = await self.mdmf_node.download_best_version()
self.assertThat(data, Contains(b"modified"))
await self.sdmf_node.modify(modifier)
data = await self.sdmf_node.download_best_version()
self.assertThat(data, Contains(b"modified"))
def test_version_modify(self):
async def test_version_modify(self) -> None:
# TODO: When we can publish multiple versions, alter this test
# to modify a version other than the best usable version, then
# test to see that the best recoverable version is that.
d = self.do_upload()
await self.do_upload()
def modifier(old_contents, servermap, first_time):
return old_contents + b"modified"
d.addCallback(lambda ign: self.mdmf_node.modify(modifier))
d.addCallback(lambda ignored:
self.mdmf_node.download_best_version())
d.addCallback(lambda data:
self.assertThat(data, Contains(b"modified")))
d.addCallback(lambda ignored:
self.sdmf_node.modify(modifier))
d.addCallback(lambda ignored:
self.sdmf_node.download_best_version())
d.addCallback(lambda data:
self.assertThat(data, Contains(b"modified")))
return d
await self.mdmf_node.modify(modifier)
data = await self.mdmf_node.download_best_version()
self.assertThat(data, Contains(b"modified"))
await self.sdmf_node.modify(modifier)
data = await self.sdmf_node.download_best_version()
self.assertThat(data, Contains(b"modified"))
def test_download_version(self):
d = self.publish_multiple()
async def test_download_version(self) -> None:
await self.publish_multiple()
# We want to have two recoverable versions on the grid.
d.addCallback(lambda res:
self._set_versions({0:0,2:0,4:0,6:0,8:0,
1:1,3:1,5:1,7:1,9:1}))
self._set_versions({0:0,2:0,4:0,6:0,8:0,
1:1,3:1,5:1,7:1,9:1})
# Now try to download each version. We should get the plaintext
# associated with that version.
d.addCallback(lambda ignored:
self._fn.get_servermap(mode=MODE_READ))
def _got_servermap(smap):
versions = smap.recoverable_versions()
assert len(versions) == 2
smap = await self._fn.get_servermap(mode=MODE_READ)
versions = smap.recoverable_versions()
assert len(versions) == 2
self.servermap = smap
self.version1, self.version2 = versions
assert self.version1 != self.version2
self.servermap = smap
self.version1, self.version2 = versions
assert self.version1 != self.version2
self.version1_seqnum = self.version1[0]
self.version2_seqnum = self.version2[0]
self.version1_index = self.version1_seqnum - 1
self.version2_index = self.version2_seqnum - 1
self.version1_seqnum = self.version1[0]
self.version2_seqnum = self.version2[0]
self.version1_index = self.version1_seqnum - 1
self.version2_index = self.version2_seqnum - 1
d.addCallback(_got_servermap)
d.addCallback(lambda ignored:
self._fn.download_version(self.servermap, self.version1))
d.addCallback(lambda results:
self.assertThat(self.CONTENTS[self.version1_index],
Equals(results)))
d.addCallback(lambda ignored:
self._fn.download_version(self.servermap, self.version2))
d.addCallback(lambda results:
self.assertThat(self.CONTENTS[self.version2_index],
Equals(results)))
return d
results = await self._fn.download_version(self.servermap, self.version1)
self.assertThat(self.CONTENTS[self.version1_index],
Equals(results))
results = await self._fn.download_version(self.servermap, self.version2)
self.assertThat(self.CONTENTS[self.version2_index],
Equals(results))
def test_download_nonexistent_version(self):
d = self.do_upload_mdmf()
d.addCallback(lambda ign: self.mdmf_node.get_servermap(mode=MODE_WRITE))
def _set_servermap(servermap):
self.servermap = servermap
d.addCallback(_set_servermap)
d.addCallback(lambda ignored:
self.shouldFail(UnrecoverableFileError, "nonexistent version",
None,
self.mdmf_node.download_version, self.servermap,
"not a version"))
return d
async def test_download_nonexistent_version(self) -> None:
await self.do_upload_mdmf()
servermap = await self.mdmf_node.get_servermap(mode=MODE_WRITE)
await self.shouldFail(UnrecoverableFileError, "nonexistent version",
None,
self.mdmf_node.download_version, servermap,
"not a version")
def _test_partial_read(self, node, expected, modes, step):
d = node.get_best_readable_version()
async def _test_partial_read(self, node, expected, modes, step) -> None:
version = await node.get_best_readable_version()
for (name, offset, length) in modes:
d.addCallback(self._do_partial_read, name, expected, offset, length)
await self._do_partial_read(version, name, expected, offset, length)
# then read the whole thing, but only a few bytes at a time, and see
# that the results are what we expect.
def _read_data(version):
c = consumer.MemoryConsumer()
d2 = defer.succeed(None)
for i in range(0, len(expected), step):
d2.addCallback(lambda ignored, i=i: version.read(c, i, step))
d2.addCallback(lambda ignored:
self.assertThat(expected, Equals(b"".join(c.chunks))))
return d2
d.addCallback(_read_data)
return d
def _do_partial_read(self, version, name, expected, offset, length):
c = consumer.MemoryConsumer()
d = version.read(c, offset, length)
for i in range(0, len(expected), step):
await version.read(c, i, step)
self.assertThat(expected, Equals(b"".join(c.chunks)))
async def _do_partial_read(self, version, name, expected, offset, length) -> None:
c = consumer.MemoryConsumer()
await version.read(c, offset, length)
if length is None:
expected_range = expected[offset:]
else:
expected_range = expected[offset:offset+length]
d.addCallback(lambda ignored: b"".join(c.chunks))
def _check(results):
if results != expected_range:
print("read([%d]+%s) got %d bytes, not %d" % \
(offset, length, len(results), len(expected_range)))
print("got: %s ... %s" % (results[:20], results[-20:]))
print("exp: %s ... %s" % (expected_range[:20], expected_range[-20:]))
self.fail("results[%s] != expected_range" % name)
return version # daisy-chained to next call
d.addCallback(_check)
return d
results = b"".join(c.chunks)
if results != expected_range:
print("read([%d]+%s) got %d bytes, not %d" % \
(offset, length, len(results), len(expected_range)))
print("got: %r ... %r" % (results[:20], results[-20:]))
print("exp: %r ... %r" % (expected_range[:20], expected_range[-20:]))
self.fail("results[%s] != expected_range" % name)
def test_partial_read_mdmf_0(self):
async def test_partial_read_mdmf_0(self) -> None:
data = b""
d = self.do_upload_mdmf(data=data)
result = await self.do_upload_mdmf(data=data)
modes = [("all1", 0,0),
("all2", 0,None),
]
d.addCallback(self._test_partial_read, data, modes, 1)
return d
await self._test_partial_read(result, data, modes, 1)
def test_partial_read_mdmf_large(self):
async def test_partial_read_mdmf_large(self) -> None:
segment_boundary = mathutil.next_multiple(128 * 1024, 3)
modes = [("start_on_segment_boundary", segment_boundary, 50),
("ending_one_byte_after_segment_boundary", segment_boundary-50, 51),
@ -393,20 +311,18 @@ class Version(GridTestMixin, AsyncTestCase, testutil.ShouldFailMixin, \
("complete_file1", 0, len(self.data)),
("complete_file2", 0, None),
]
d = self.do_upload_mdmf()
d.addCallback(self._test_partial_read, self.data, modes, 10000)
return d
result = await self.do_upload_mdmf()
await self._test_partial_read(result, self.data, modes, 10000)
def test_partial_read_sdmf_0(self):
async def test_partial_read_sdmf_0(self) -> None:
data = b""
modes = [("all1", 0,0),
("all2", 0,None),
]
d = self.do_upload_sdmf(data=data)
d.addCallback(self._test_partial_read, data, modes, 1)
return d
result = await self.do_upload_sdmf(data=data)
await self._test_partial_read(result, data, modes, 1)
def test_partial_read_sdmf_2(self):
async def test_partial_read_sdmf_2(self) -> None:
data = b"hi"
modes = [("one_byte", 0, 1),
("last_byte", 1, 1),
@ -414,11 +330,10 @@ class Version(GridTestMixin, AsyncTestCase, testutil.ShouldFailMixin, \
("complete_file", 0, 2),
("complete_file2", 0, None),
]
d = self.do_upload_sdmf(data=data)
d.addCallback(self._test_partial_read, data, modes, 1)
return d
result = await self.do_upload_sdmf(data=data)
await self._test_partial_read(result, data, modes, 1)
def test_partial_read_sdmf_90(self):
async def test_partial_read_sdmf_90(self) -> None:
modes = [("start_at_middle", 50, 40),
("start_at_middle2", 50, None),
("zero_length_at_start", 0, 0),
@ -427,11 +342,10 @@ class Version(GridTestMixin, AsyncTestCase, testutil.ShouldFailMixin, \
("complete_file1", 0, None),
("complete_file2", 0, 90),
]
d = self.do_upload_sdmf()
d.addCallback(self._test_partial_read, self.small_data, modes, 10)
return d
result = await self.do_upload_sdmf()
await self._test_partial_read(result, self.small_data, modes, 10)
def test_partial_read_sdmf_100(self):
async def test_partial_read_sdmf_100(self) -> None:
data = b"test data "*10
modes = [("start_at_middle", 50, 50),
("start_at_middle2", 50, None),
@ -440,42 +354,30 @@ class Version(GridTestMixin, AsyncTestCase, testutil.ShouldFailMixin, \
("complete_file1", 0, 100),
("complete_file2", 0, None),
]
d = self.do_upload_sdmf(data=data)
d.addCallback(self._test_partial_read, data, modes, 10)
return d
result = await self.do_upload_sdmf(data=data)
await self._test_partial_read(result, data, modes, 10)
async def _test_read_and_download(self, node, expected) -> None:
version = await node.get_best_readable_version()
c = consumer.MemoryConsumer()
await version.read(c)
self.assertThat(expected, Equals(b"".join(c.chunks)))
def _test_read_and_download(self, node, expected):
d = node.get_best_readable_version()
def _read_data(version):
c = consumer.MemoryConsumer()
c2 = consumer.MemoryConsumer()
d2 = defer.succeed(None)
d2.addCallback(lambda ignored: version.read(c))
d2.addCallback(lambda ignored:
self.assertThat(expected, Equals(b"".join(c.chunks))))
c2 = consumer.MemoryConsumer()
await version.read(c2, offset=0, size=len(expected))
self.assertThat(expected, Equals(b"".join(c2.chunks)))
d2.addCallback(lambda ignored: version.read(c2, offset=0,
size=len(expected)))
d2.addCallback(lambda ignored:
self.assertThat(expected, Equals(b"".join(c2.chunks))))
return d2
d.addCallback(_read_data)
d.addCallback(lambda ignored: node.download_best_version())
d.addCallback(lambda data: self.assertThat(expected, Equals(data)))
return d
data = await node.download_best_version()
self.assertThat(expected, Equals(data))
def test_read_and_download_mdmf(self):
d = self.do_upload_mdmf()
d.addCallback(self._test_read_and_download, self.data)
return d
async def test_read_and_download_mdmf(self) -> None:
result = await self.do_upload_mdmf()
await self._test_read_and_download(result, self.data)
def test_read_and_download_sdmf(self):
d = self.do_upload_sdmf()
d.addCallback(self._test_read_and_download, self.small_data)
return d
async def test_read_and_download_sdmf(self) -> None:
result = await self.do_upload_sdmf()
await self._test_read_and_download(result, self.small_data)
def test_read_and_download_sdmf_zero_length(self):
d = self.do_upload_empty_sdmf()
d.addCallback(self._test_read_and_download, b"")
return d
async def test_read_and_download_sdmf_zero_length(self) -> None:
result = await self.do_upload_empty_sdmf()
await self._test_read_and_download(result, b"")

View File

@ -1,198 +0,0 @@
"""
Tests for allmydata.util.pipeline.
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import gc
from twisted.internet import defer
from twisted.trial import unittest
from twisted.python import log
from twisted.python.failure import Failure
from allmydata.util import pipeline
class Pipeline(unittest.TestCase):
def pause(self, *args, **kwargs):
d = defer.Deferred()
self.calls.append( (d, args, kwargs) )
return d
def failUnlessCallsAre(self, expected):
#print(self.calls)
#print(expected)
self.failUnlessEqual(len(self.calls), len(expected), self.calls)
for i,c in enumerate(self.calls):
self.failUnlessEqual(c[1:], expected[i], str(i))
def test_basic(self):
self.calls = []
finished = []
p = pipeline.Pipeline(100)
d = p.flush() # fires immediately
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
finished = []
d = p.add(10, self.pause, "one")
# the call should start right away, and our return Deferred should
# fire right away
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessCallsAre([ ( ("one",) , {} ) ])
self.failUnlessEqual(p.gauge, 10)
# pipeline: [one]
finished = []
d = p.add(20, self.pause, "two", kw=2)
# pipeline: [one, two]
# the call and the Deferred should fire right away
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessCallsAre([ ( ("one",) , {} ),
( ("two",) , {"kw": 2} ),
])
self.failUnlessEqual(p.gauge, 30)
self.calls[0][0].callback("one-result")
# pipeline: [two]
self.failUnlessEqual(p.gauge, 20)
finished = []
d = p.add(90, self.pause, "three", "posarg1")
# pipeline: [two, three]
flushed = []
fd = p.flush()
fd.addCallbacks(flushed.append, log.err)
self.failUnlessEqual(flushed, [])
# the call will be made right away, but the return Deferred will not,
# because the pipeline is now full.
d.addCallbacks(finished.append, log.err)
self.failUnlessEqual(len(finished), 0)
self.failUnlessCallsAre([ ( ("one",) , {} ),
( ("two",) , {"kw": 2} ),
( ("three", "posarg1"), {} ),
])
self.failUnlessEqual(p.gauge, 110)
self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause)
# retiring either call will unblock the pipeline, causing the #3
# Deferred to fire
self.calls[2][0].callback("three-result")
# pipeline: [two]
self.failUnlessEqual(len(finished), 1)
self.failUnlessEqual(finished[0], None)
self.failUnlessEqual(flushed, [])
# retiring call#2 will finally allow the flush() Deferred to fire
self.calls[1][0].callback("two-result")
self.failUnlessEqual(len(flushed), 1)
def test_errors(self):
self.calls = []
p = pipeline.Pipeline(100)
d1 = p.add(200, self.pause, "one")
d2 = p.flush()
finished = []
d1.addBoth(finished.append)
self.failUnlessEqual(finished, [])
flushed = []
d2.addBoth(flushed.append)
self.failUnlessEqual(flushed, [])
self.calls[0][0].errback(ValueError("oops"))
self.failUnlessEqual(len(finished), 1)
f = finished[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
self.failUnlessIn("PipelineError", str(f.value))
self.failUnlessIn("ValueError", str(f.value))
r = repr(f.value)
self.failUnless("ValueError" in r, r)
f2 = f.value.error
self.failUnless(f2.check(ValueError))
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
# now that the pipeline is in the failed state, any new calls will
# fail immediately
d3 = p.add(20, self.pause, "two")
finished = []
d3.addBoth(finished.append)
self.failUnlessEqual(len(finished), 1)
f = finished[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
r = repr(f.value)
self.failUnless("ValueError" in r, r)
f2 = f.value.error
self.failUnless(f2.check(ValueError))
d4 = p.flush()
flushed = []
d4.addBoth(flushed.append)
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
def test_errors2(self):
self.calls = []
p = pipeline.Pipeline(100)
d1 = p.add(10, self.pause, "one")
d2 = p.add(20, self.pause, "two")
d3 = p.add(30, self.pause, "three")
d4 = p.flush()
# one call fails, then the second one succeeds: make sure
# ExpandableDeferredList tolerates the second one
flushed = []
d4.addBoth(flushed.append)
self.failUnlessEqual(flushed, [])
self.calls[0][0].errback(ValueError("oops"))
self.failUnlessEqual(len(flushed), 1)
f = flushed[0]
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(pipeline.PipelineError))
f2 = f.value.error
self.failUnless(f2.check(ValueError))
self.calls[1][0].callback("two-result")
self.calls[2][0].errback(ValueError("three-error"))
del d1,d2,d3,d4
gc.collect() # for PyPy

View File

@ -47,6 +47,9 @@ from twisted.internet.defer import (
inlineCallbacks,
DeferredList,
)
from twisted.internet.testing import (
MemoryReactorClock,
)
from twisted.python.filepath import FilePath
from allmydata.util import fileutil, pollmixin
from allmydata.util.encodingutil import unicode_to_argv
@ -60,6 +63,9 @@ import allmydata
from allmydata.scripts.runner import (
parse_options,
)
from allmydata.scripts.tahoe_run import (
on_stdin_close,
)
from .common import (
PIPE,
@ -624,6 +630,64 @@ class RunNode(common_util.SignalMixin, unittest.TestCase, pollmixin.PollMixin):
yield client_running
def _simulate_windows_stdin_close(stdio):
"""
on Unix we can just close all the readers, correctly "simulating"
a stdin close .. of course, Windows has to be difficult
"""
stdio.writeConnectionLost()
stdio.readConnectionLost()
class OnStdinCloseTests(SyncTestCase):
"""
Tests for on_stdin_close
"""
def test_close_called(self):
"""
our on-close method is called when stdin closes
"""
reactor = MemoryReactorClock()
called = []
def onclose():
called.append(True)
transport = on_stdin_close(reactor, onclose)
self.assertEqual(called, [])
if platform.isWindows():
_simulate_windows_stdin_close(transport)
else:
for reader in reactor.getReaders():
reader.loseConnection()
reactor.advance(1) # ProcessReader does a callLater(0, ..)
self.assertEqual(called, [True])
def test_exception_ignored(self):
"""
An exception from our on-close function is discarded.
"""
reactor = MemoryReactorClock()
called = []
def onclose():
called.append(True)
raise RuntimeError("unexpected error")
transport = on_stdin_close(reactor, onclose)
self.assertEqual(called, [])
if platform.isWindows():
_simulate_windows_stdin_close(transport)
else:
for reader in reactor.getReaders():
reader.loseConnection()
reactor.advance(1) # ProcessReader does a callLater(0, ..)
self.assertEqual(called, [True])
class PidFileLocking(SyncTestCase):
"""
Direct tests for allmydata.util.pid functions

View File

@ -3,14 +3,9 @@ Tests for allmydata.storage.
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import native_str, PY2, bytes_to_native_str, bchr
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from __future__ import annotations
from future.utils import native_str, bytes_to_native_str, bchr
from six import ensure_str
from io import (
@ -59,7 +54,7 @@ from allmydata.storage.common import storage_index_to_dir, \
si_b2a, si_a2b
from allmydata.storage.lease import LeaseInfo
from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
ReadBucketProxy
ReadBucketProxy, _WriteBuffer
from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
LayoutInvalid, MDMFSIGNABLEHEADER, \
SIGNED_PREFIX, MDMFHEADER, \
@ -3746,3 +3741,39 @@ class LeaseInfoTests(SyncTestCase):
info.to_mutable_data(),
HasLength(info.mutable_size()),
)
class WriteBufferTests(SyncTestCase):
"""Tests for ``_WriteBuffer``."""
@given(
small_writes=strategies.lists(
strategies.binary(min_size=1, max_size=20),
min_size=10, max_size=20),
batch_size=strategies.integers(min_value=5, max_value=10)
)
def test_write_buffer(self, small_writes: list[bytes], batch_size: int):
"""
``_WriteBuffer`` coalesces small writes into bigger writes based on
the batch size.
"""
wb = _WriteBuffer(batch_size)
result = b""
for data in small_writes:
should_flush = wb.queue_write(data)
if should_flush:
flushed_offset, flushed_data = wb.flush()
self.assertEqual(flushed_offset, len(result))
# The flushed data is in batch sizes, or closest approximation
# given queued inputs:
self.assertTrue(batch_size <= len(flushed_data) < batch_size + len(data))
result += flushed_data
# Final flush:
remaining_length = wb.get_queued_bytes()
flushed_offset, flushed_data = wb.flush()
self.assertEqual(remaining_length, len(flushed_data))
self.assertEqual(flushed_offset, len(result))
result += flushed_data
self.assertEqual(result, b"".join(small_writes))

View File

@ -9,18 +9,7 @@
"""
Tests for the allmydata.testing helpers
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from twisted.internet.defer import (
inlineCallbacks,
@ -56,10 +45,12 @@ from testtools.matchers import (
IsInstance,
MatchesStructure,
AfterPreprocessing,
Contains,
)
from testtools.twistedsupport import (
succeeded,
)
from twisted.web.http import GONE
class FakeWebTest(SyncTestCase):
@ -144,7 +135,8 @@ class FakeWebTest(SyncTestCase):
def test_download_missing(self):
"""
Error if we download a capability that doesn't exist
The response to a request to download a capability that doesn't exist
is 410 (GONE).
"""
http_client = create_tahoe_treq_client()
@ -157,7 +149,11 @@ class FakeWebTest(SyncTestCase):
resp,
succeeded(
MatchesStructure(
code=Equals(500)
code=Equals(GONE),
content=AfterPreprocessing(
lambda m: m(),
succeeded(Contains(b"No data for")),
),
)
)
)

View File

@ -6,20 +6,12 @@
# This file is part of Tahoe-LAFS.
#
# See the docs/about.rst file for licensing information.
"""Test-helpers for clients that use the WebUI.
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
Test-helpers for clients that use the WebUI.
"""
from __future__ import annotations
import hashlib
@ -54,6 +46,7 @@ import allmydata.uri
from allmydata.util import (
base32,
)
from ..util.dictutil import BytesKeyDict
__all__ = (
@ -147,7 +140,7 @@ class _FakeTahoeUriHandler(Resource, object):
isLeaf = True
data = attr.ib(default=attr.Factory(dict))
data: BytesKeyDict = attr.ib(default=attr.Factory(BytesKeyDict))
capability_generators = attr.ib(default=attr.Factory(dict))
def _generate_capability(self, kind):
@ -209,7 +202,7 @@ class _FakeTahoeUriHandler(Resource, object):
capability = None
for arg, value in uri.query:
if arg == u"uri":
capability = value
capability = value.encode("utf-8")
# it's legal to use the form "/uri/<capability>"
if capability is None and request.postpath and request.postpath[0]:
capability = request.postpath[0]
@ -221,10 +214,9 @@ class _FakeTahoeUriHandler(Resource, object):
# the user gave us a capability; if our Grid doesn't have any
# data for it, that's an error.
capability = capability.encode('ascii')
if capability not in self.data:
request.setResponseCode(http.BAD_REQUEST)
return u"No data for '{}'".format(capability.decode('ascii'))
request.setResponseCode(http.GONE)
return u"No data for '{}'".format(capability.decode('ascii')).encode("utf-8")
return self.data[capability]

View File

@ -1,21 +1,6 @@
"""
Tools to mess with dicts.
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
# IMPORTANT: We deliberately don't import dict. The issue is that we're
# subclassing dict, so we'd end up exposing Python 3 dict APIs to lots of
# code that doesn't support it.
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, list, object, range, str, max, min # noqa: F401
from six import ensure_str
class DictOfSets(dict):
def add(self, key, value):
@ -104,7 +89,7 @@ def _make_enforcing_override(K, method_name):
raise TypeError("{} must be of type {}".format(
repr(key), self.KEY_TYPE))
return getattr(dict, method_name)(self, key, *args, **kwargs)
f.__name__ = ensure_str(method_name)
f.__name__ = method_name
setattr(K, method_name, f)
for _method_name in ["__setitem__", "__getitem__", "setdefault", "get",
@ -113,18 +98,13 @@ for _method_name in ["__setitem__", "__getitem__", "setdefault", "get",
del _method_name
if PY2:
# No need for enforcement, can use either bytes or unicode as keys and it's
# fine.
BytesKeyDict = UnicodeKeyDict = dict
else:
class BytesKeyDict(_TypedKeyDict):
"""Keys should be bytes."""
class BytesKeyDict(_TypedKeyDict):
"""Keys should be bytes."""
KEY_TYPE = bytes
KEY_TYPE = bytes
class UnicodeKeyDict(_TypedKeyDict):
"""Keys should be unicode strings."""
class UnicodeKeyDict(_TypedKeyDict):
"""Keys should be unicode strings."""
KEY_TYPE = str
KEY_TYPE = str

View File

@ -1,149 +0,0 @@
"""
A pipeline of Deferreds.
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from twisted.internet import defer
from twisted.python.failure import Failure
from twisted.python import log
from allmydata.util.assertutil import precondition
class PipelineError(Exception):
"""One of the pipelined messages returned an error. The received Failure
object is stored in my .error attribute."""
def __init__(self, error):
self.error = error
def __repr__(self):
return "<PipelineError error=(%r)>" % (self.error,)
def __str__(self):
return "<PipelineError error=(%s)>" % (self.error,)
class SingleFileError(Exception):
"""You are not permitted to add a job to a full pipeline."""
class ExpandableDeferredList(defer.Deferred, object):
# like DeferredList(fireOnOneErrback=True) with a built-in
# gatherResults(), but you can add new Deferreds until you close it. This
# gives you a chance to add don't-complain-about-unhandled-error errbacks
# immediately after attachment, regardless of whether you actually end up
# wanting the list or not.
def __init__(self):
defer.Deferred.__init__(self)
self.resultsReceived = 0
self.resultList = []
self.failure = None
self.closed = False
def addDeferred(self, d):
precondition(not self.closed, "don't call addDeferred() on a closed ExpandableDeferredList")
index = len(self.resultList)
self.resultList.append(None)
d.addCallbacks(self._cbDeferred, self._ebDeferred,
callbackArgs=(index,))
return d
def close(self):
self.closed = True
self.checkForFinished()
def checkForFinished(self):
if not self.closed:
return
if self.called:
return
if self.failure:
self.errback(self.failure)
elif self.resultsReceived == len(self.resultList):
self.callback(self.resultList)
def _cbDeferred(self, res, index):
self.resultList[index] = res
self.resultsReceived += 1
self.checkForFinished()
return res
def _ebDeferred(self, f):
self.failure = f
self.checkForFinished()
return f
class Pipeline(object):
"""I manage a size-limited pipeline of Deferred operations, usually
callRemote() messages."""
def __init__(self, capacity):
self.capacity = capacity # how full we can be
self.gauge = 0 # how full we are
self.failure = None
self.waiting = [] # callers of add() who are blocked
self.unflushed = ExpandableDeferredList()
def add(self, _size, _func, *args, **kwargs):
# We promise that all the Deferreds we return will fire in the order
# they were returned. To make it easier to keep this promise, we
# prohibit multiple outstanding calls to add() .
if self.waiting:
raise SingleFileError
if self.failure:
return defer.fail(self.failure)
self.gauge += _size
fd = defer.maybeDeferred(_func, *args, **kwargs)
fd.addBoth(self._call_finished, _size)
self.unflushed.addDeferred(fd)
fd.addErrback(self._eat_pipeline_errors)
fd.addErrback(log.err, "_eat_pipeline_errors didn't eat it")
if self.gauge < self.capacity:
return defer.succeed(None)
d = defer.Deferred()
self.waiting.append(d)
return d
def flush(self):
if self.failure:
return defer.fail(self.failure)
d, self.unflushed = self.unflushed, ExpandableDeferredList()
d.close()
d.addErrback(self._flushed_error)
return d
def _flushed_error(self, f):
precondition(self.failure) # should have been set by _call_finished
return self.failure
def _call_finished(self, res, size):
self.gauge -= size
if isinstance(res, Failure):
res = Failure(PipelineError(res))
if not self.failure:
self.failure = res
if self.failure:
while self.waiting:
d = self.waiting.pop(0)
d.errback(self.failure)
else:
while self.waiting and (self.gauge < self.capacity):
d = self.waiting.pop(0)
d.callback(None)
# the d.callback() might trigger a new call to add(), which
# will raise our gauge and might cause the pipeline to be
# filled. So the while() loop gets a chance to tell the
# caller to stop.
return res
def _eat_pipeline_errors(self, f):
f.trap(PipelineError)
return None

View File

@ -86,7 +86,6 @@ commands =
coverage: python -b -m coverage run -m twisted.trial {env:TAHOE_LAFS_TRIAL_ARGS:--rterrors --reporter=timing} {posargs:{env:TEST_SUITE}}
coverage: coverage combine
coverage: coverage xml
coverage: coverage report
[testenv:integration]
basepython = python3
@ -99,7 +98,6 @@ commands =
# NOTE: 'run with "py.test --keep-tempdir -s -v integration/" to debug failures'
py.test --timeout=1800 --coverage -s -v {posargs:integration}
coverage combine
coverage report
[testenv:codechecks]