Merge branch 'master' into clean-up-tor-and-i2p-providers

This commit is contained in:
Itamar Turner-Trauring 2023-07-11 15:45:39 -04:00 committed by GitHub
commit 71134db007
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
68 changed files with 1076 additions and 450 deletions

View File

@ -53,9 +53,9 @@ jobs:
- "3.11"
include:
# On macOS don't bother with 3.8, just to get faster builds.
- os: macos-latest
- os: macos-12
python-version: "3.9"
- os: macos-latest
- os: macos-12
python-version: "3.11"
# We only support PyPy on Linux at the moment.
- os: ubuntu-latest
@ -165,16 +165,16 @@ jobs:
fail-fast: false
matrix:
include:
- os: macos-latest
python-version: "3.9"
- os: macos-12
python-version: "3.11"
force-foolscap: false
- os: windows-latest
python-version: "3.9"
python-version: "3.11"
force-foolscap: false
# 22.04 has some issue with Tor at the moment:
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3943
- os: ubuntu-20.04
python-version: "3.11"
python-version: "3.10"
force-foolscap: false
steps:
@ -248,7 +248,7 @@ jobs:
fail-fast: false
matrix:
os:
- macos-10.15
- macos-12
- windows-latest
- ubuntu-latest
python-version:

View File

@ -1,5 +1,10 @@
version: 2
build:
os: ubuntu-22.04
tools:
python: "3.10"
python:
install:
- requirements: docs/requirements.txt

View File

@ -9,4 +9,10 @@ select = [
# Make sure we bind closure variables in a loop (equivalent to pylint
# cell-var-from-loop):
"B023",
# Don't silence exceptions in finally by accident:
"B012",
# Don't use mutable default arguments:
"B006",
# Errors from PyLint:
"PLE",
]

View File

@ -50,9 +50,9 @@ from .util import (
)
from allmydata.node import read_config
# No reason for HTTP requests to take longer than two minutes in the
# No reason for HTTP requests to take longer than four minutes in the
# integration tests. See allmydata/scripts/common_http.py for usage.
os.environ["__TAHOE_CLI_HTTP_TIMEOUT"] = "120"
os.environ["__TAHOE_CLI_HTTP_TIMEOUT"] = "240"
# Make Foolscap logging go into Twisted logging, so that integration test logs
# include extra information
@ -279,7 +279,7 @@ def introducer_furl(introducer, temp_dir):
return furl
@pytest.fixture(scope='session')
@pytest.fixture
@log_call(
action_type=u"integration:tor:introducer",
include_args=["temp_dir", "flog_gatherer"],
@ -342,7 +342,7 @@ def tor_introducer(reactor, temp_dir, flog_gatherer, request):
return transport
@pytest.fixture(scope='session')
@pytest.fixture
def tor_introducer_furl(tor_introducer, temp_dir):
furl_fname = join(temp_dir, 'introducer_tor', 'private', 'introducer.furl')
while not exists(furl_fname):
@ -401,9 +401,6 @@ def alice(
reactor, request, temp_dir, introducer_furl, flog_gatherer, "alice",
web_port="tcp:9980:interface=localhost",
storage=False,
# We're going to kill this ourselves, so no need for finalizer to
# do it:
finalize=False,
)
)
pytest_twisted.blockon(await_client_ready(process))

View File

@ -6,8 +6,9 @@ and stdout.
from subprocess import Popen, PIPE, check_output, check_call
import pytest
from pytest_twisted import ensureDeferred
from twisted.internet import reactor
from twisted.internet.threads import blockingCallFromThread
from twisted.internet.defer import Deferred
from .util import run_in_thread, cli, reconfigure
@ -86,8 +87,8 @@ def test_large_file(alice, get_put_alias, tmp_path):
assert outfile.read_bytes() == tempfile.read_bytes()
@ensureDeferred
async def test_upload_download_immutable_different_default_max_segment_size(alice, get_put_alias, tmpdir, request):
@run_in_thread
def test_upload_download_immutable_different_default_max_segment_size(alice, get_put_alias, tmpdir, request):
"""
Tahoe-LAFS used to have a default max segment size of 128KB, and is now
1MB. Test that an upload created when 128KB was the default can be
@ -100,22 +101,25 @@ async def test_upload_download_immutable_different_default_max_segment_size(alic
with tempfile.open("wb") as f:
f.write(large_data)
async def set_segment_size(segment_size):
await reconfigure(
def set_segment_size(segment_size):
return blockingCallFromThread(
reactor,
request,
alice,
(1, 1, 1),
None,
max_segment_size=segment_size
)
lambda: Deferred.fromCoroutine(reconfigure(
reactor,
request,
alice,
(1, 1, 1),
None,
max_segment_size=segment_size
))
)
# 1. Upload file 1 with default segment size set to 1MB
await set_segment_size(1024 * 1024)
set_segment_size(1024 * 1024)
cli(alice, "put", str(tempfile), "getput:seg1024kb")
# 2. Download file 1 with default segment size set to 128KB
await set_segment_size(128 * 1024)
set_segment_size(128 * 1024)
assert large_data == check_output(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg1024kb", "-"]
)
@ -124,7 +128,7 @@ async def test_upload_download_immutable_different_default_max_segment_size(alic
cli(alice, "put", str(tempfile), "getput:seg128kb")
# 4. Download file 2 with default segment size set to 1MB
await set_segment_size(1024 * 1024)
set_segment_size(1024 * 1024)
assert large_data == check_output(
["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg128kb", "-"]
)

View File

@ -19,6 +19,7 @@ from allmydata.test.common import (
write_introducer,
)
from allmydata.client import read_config
from allmydata.util.deferredutil import async_to_deferred
# see "conftest.py" for the fixtures (e.g. "tor_network")
@ -31,13 +32,26 @@ if sys.platform.startswith('win'):
@pytest_twisted.inlineCallbacks
def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl):
carol = yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl)
dave = yield _create_anonymous_node(reactor, 'dave', 8009, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl)
"""
Two nodes and an introducer all configured to use Tahoe.
The two nodes can talk to the introducer and each other: we upload to one
node, read from the other.
"""
carol = yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2)
dave = yield _create_anonymous_node(reactor, 'dave', 8009, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2)
yield util.await_client_ready(carol, minimum_number_of_servers=2, timeout=600)
yield util.await_client_ready(dave, minimum_number_of_servers=2, timeout=600)
yield upload_to_one_download_from_the_other(reactor, temp_dir, carol, dave)
@async_to_deferred
async def upload_to_one_download_from_the_other(reactor, temp_dir, upload_to: util.TahoeProcess, download_from: util.TahoeProcess):
"""
Ensure both nodes are connected to "a grid" by uploading something via one
node, and retrieve it using the other.
"""
# ensure both nodes are connected to "a grid" by uploading
# something via carol, and retrieve it using dave.
gold_path = join(temp_dir, "gold")
with open(gold_path, "w") as f:
f.write(
@ -54,12 +68,12 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne
sys.executable,
(
sys.executable, '-b', '-m', 'allmydata.scripts.runner',
'-d', join(temp_dir, 'carol'),
'-d', upload_to.node_dir,
'put', gold_path,
),
env=environ,
)
yield proto.done
await proto.done
cap = proto.output.getvalue().strip().split()[-1]
print("capability: {}".format(cap))
@ -69,19 +83,18 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne
sys.executable,
(
sys.executable, '-b', '-m', 'allmydata.scripts.runner',
'-d', join(temp_dir, 'dave'),
'-d', download_from.node_dir,
'get', cap,
),
env=environ,
)
yield proto.done
dave_got = proto.output.getvalue().strip()
assert dave_got == open(gold_path, 'rb').read().strip()
await proto.done
download_got = proto.output.getvalue().strip()
assert download_got == open(gold_path, 'rb').read().strip()
@pytest_twisted.inlineCallbacks
def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl):
def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl, shares_total: int) -> util.TahoeProcess:
node_dir = FilePath(temp_dir).child(name)
web_port = "tcp:{}:interface=localhost".format(control_port + 2000)
@ -103,7 +116,7 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_
'--listen', 'tor',
'--shares-needed', '1',
'--shares-happy', '1',
'--shares-total', '2',
'--shares-total', str(shares_total),
node_dir.path,
),
env=environ,
@ -113,9 +126,9 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_
# Which services should this client connect to?
write_introducer(node_dir, "default", introducer_furl)
util.basic_node_configuration(request, flog_gatherer, node_dir.path)
config = read_config(node_dir.path, "tub.port")
config.set_config("node", "log_gatherer.furl", flog_gatherer)
config.set_config("tor", "onion", "true")
config.set_config("tor", "onion.external_port", "3457")
config.set_config("tor", "control.port", f"tcp:port={control_port}:host=127.0.0.1")
@ -125,3 +138,26 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_
result = yield util._run_node(reactor, node_dir.path, request, None)
print("okay, launched")
return result
@pytest.mark.skipif(sys.platform.startswith('darwin'), reason='This test has issues on macOS')
@pytest_twisted.inlineCallbacks
def test_anonymous_client(reactor, request, temp_dir, flog_gatherer, tor_network, introducer_furl):
"""
A normal node (normie) and a normal introducer are configured, and one node
(anonymoose) which is configured to be anonymous by talking via Tor.
Anonymoose should be able to communicate with normie.
TODO how to ensure that anonymoose is actually using Tor?
"""
normie = yield util._create_node(
reactor, request, temp_dir, introducer_furl, flog_gatherer, "normie",
web_port="tcp:9989:interface=localhost",
storage=True, needed=1, happy=1, total=1,
)
yield util.await_client_ready(normie)
anonymoose = yield _create_anonymous_node(reactor, 'anonymoose', 8008, request, temp_dir, flog_gatherer, tor_network, introducer_furl, 1)
yield util.await_client_ready(anonymoose, minimum_number_of_servers=1, timeout=600)
yield upload_to_one_download_from_the_other(reactor, temp_dir, normie, anonymoose)

View File

@ -14,6 +14,8 @@ from __future__ import annotations
import time
from urllib.parse import unquote as url_unquote, quote as url_quote
from twisted.internet.threads import deferToThread
import allmydata.uri
from allmydata.util import jsonbytes as json
@ -24,7 +26,7 @@ import requests
import html5lib
from bs4 import BeautifulSoup
from pytest_twisted import ensureDeferred
import pytest_twisted
@run_in_thread
def test_index(alice):
@ -185,7 +187,7 @@ def test_deep_stats(alice):
time.sleep(.5)
@util.run_in_thread
@run_in_thread
def test_status(alice):
"""
confirm we get something sensible from /status and the various sub-types
@ -251,7 +253,7 @@ def test_status(alice):
assert found_download, "Failed to find the file we downloaded in the status-page"
@ensureDeferred
@pytest_twisted.ensureDeferred
async def test_directory_deep_check(reactor, request, alice):
"""
use deep-check and confirm the result pages work
@ -263,7 +265,10 @@ async def test_directory_deep_check(reactor, request, alice):
total = 4
await util.reconfigure(reactor, request, alice, (happy, required, total), convergence=None)
await deferToThread(_test_directory_deep_check_blocking, alice)
def _test_directory_deep_check_blocking(alice):
# create a directory
resp = requests.post(
util.node_url(alice.node_dir, u"uri"),

View File

@ -140,7 +140,8 @@ class _MagicTextProtocol(ProcessProtocol):
def outReceived(self, data):
data = str(data, sys.stdout.encoding)
sys.stdout.write(self.name + data)
for line in data.splitlines():
sys.stdout.write(self.name + line + "\n")
self._output.write(data)
if not self.magic_seen.called and self._magic_text in self._output.getvalue():
print("Saw '{}' in the logs".format(self._magic_text))
@ -148,7 +149,8 @@ class _MagicTextProtocol(ProcessProtocol):
def errReceived(self, data):
data = str(data, sys.stderr.encoding)
sys.stdout.write(self.name + data)
for line in data.splitlines():
sys.stdout.write(self.name + line + "\n")
def _cleanup_process_async(transport: IProcessTransport, allow_missing: bool) -> None:
@ -311,6 +313,36 @@ def _run_node(reactor, node_dir, request, magic_text, finalize=True):
return d
def basic_node_configuration(request, flog_gatherer, node_dir: str):
"""
Setup common configuration options for a node, given a ``pytest`` request
fixture.
"""
config_path = join(node_dir, 'tahoe.cfg')
config = get_config(config_path)
set_config(
config,
u'node',
u'log_gatherer.furl',
flog_gatherer,
)
force_foolscap = request.config.getoption("force_foolscap")
assert force_foolscap in (True, False)
set_config(
config,
'storage',
'force_foolscap',
str(force_foolscap),
)
set_config(
config,
'client',
'force_foolscap',
str(force_foolscap),
)
write_config(FilePath(config_path), config)
def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, name, web_port,
storage=True,
magic_text=None,
@ -351,29 +383,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
created_d = done_proto.done
def created(_):
config_path = join(node_dir, 'tahoe.cfg')
config = get_config(config_path)
set_config(
config,
u'node',
u'log_gatherer.furl',
flog_gatherer,
)
force_foolscap = request.config.getoption("force_foolscap")
assert force_foolscap in (True, False)
set_config(
config,
'storage',
'force_foolscap',
str(force_foolscap),
)
set_config(
config,
'client',
'force_foolscap',
str(force_foolscap),
)
write_config(FilePath(config_path), config)
basic_node_configuration(request, flog_gatherer, node_dir)
created_d.addCallback(created)
d = Deferred()
@ -621,16 +631,9 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2, minimum_number_of_serve
server['last_received_data']
for server in servers
]
# if any times are null/None that server has never been
# contacted (so it's down still, probably)
never_received_data = server_times.count(None)
if never_received_data > 0:
print(f"waiting because {never_received_data} server(s) not contacted")
time.sleep(1)
continue
# check that all times are 'recent enough'
if any([time.time() - t > liveness for t in server_times]):
# check that all times are 'recent enough' (it's OK if _some_ servers
# are down, we just want to make sure a sufficient number are up)
if len([time.time() - t <= liveness for t in server_times if t is not None]) < minimum_number_of_servers:
print("waiting because at least one server too old")
time.sleep(1)
continue

0
newsfragments/3622.minor Normal file
View File

0
newsfragments/3910.minor Normal file
View File

0
newsfragments/3998.minor Normal file
View File

0
newsfragments/4009.minor Normal file
View File

0
newsfragments/4015.minor Normal file
View File

0
newsfragments/4016.minor Normal file
View File

0
newsfragments/4022.minor Normal file
View File

0
newsfragments/4023.minor Normal file
View File

0
newsfragments/4024.minor Normal file
View File

0
newsfragments/4026.minor Normal file
View File

0
newsfragments/4027.minor Normal file
View File

0
newsfragments/4028.minor Normal file
View File

View File

@ -0,0 +1,2 @@
The (still off-by-default) HTTP storage client will now use Tor when Tor-based client-side anonymity was requested.
Previously it would use normal TCP connections and not be anonymous.

0
newsfragments/4035.minor Normal file
View File

View File

@ -0,0 +1 @@
tahoe run now accepts --allow-stdin-close to mean "keep running if stdin closes"

9
nix/klein.nix Normal file
View File

@ -0,0 +1,9 @@
{ klein, fetchPypi }:
klein.overrideAttrs (old: rec {
pname = "klein";
version = "23.5.0";
src = fetchPypi {
inherit pname version;
sha256 = "sha256-kGkSt6tBDZp/NRICg5w81zoqwHe9AHHIYcMfDu92Aoc=";
};
})

10
nix/pyopenssl.nix Normal file
View File

@ -0,0 +1,10 @@
{ pyopenssl, fetchPypi, isPyPy }:
pyopenssl.overrideAttrs (old: rec {
pname = "pyOpenSSL";
version = "23.2.0";
name = "${pname}-${version}";
src = fetchPypi {
inherit pname version;
sha256 = "J2+TH1WkUufeppxxc+mE6ypEB85BPJGKo0tV+C+bi6w=";
};
})

View File

@ -21,6 +21,25 @@ in {
pycddl = self.callPackage ./pycddl.nix { };
txi2p = self.callPackage ./txi2p.nix { };
# Some packages are of somewhat too-old versions - update them.
klein = self.callPackage ./klein.nix {
# Avoid infinite recursion.
inherit (super) klein;
};
txtorcon = self.callPackage ./txtorcon.nix {
inherit (super) txtorcon;
};
# Update the version of pyopenssl.
pyopenssl = self.callPackage ./pyopenssl.nix {
pyopenssl =
# Building the docs requires sphinx which brings in a dependency on babel,
# the test suite of which fails.
onPyPy (dontBuildDocs { sphinx-rtd-theme = null; })
# Avoid infinite recursion.
super.pyopenssl;
};
# collections-extended is currently broken for Python 3.11 in nixpkgs but
# we know where a working version lives.
collections-extended = self.callPackage ./collections-extended.nix {
@ -54,10 +73,6 @@ in {
# a5f8184fb816a4fd5ae87136838c9981e0d22c67.
six = onPyPy dontCheck super.six;
# Building the docs requires sphinx which brings in a dependency on babel,
# the test suite of which fails.
pyopenssl = onPyPy (dontBuildDocs { sphinx-rtd-theme = null; }) super.pyopenssl;
# Likewise for beautifulsoup4.
beautifulsoup4 = onPyPy (dontBuildDocs {}) super.beautifulsoup4;

9
nix/txtorcon.nix Normal file
View File

@ -0,0 +1,9 @@
{ txtorcon, fetchPypi }:
txtorcon.overrideAttrs (old: rec {
pname = "txtorcon";
version = "23.5.0";
src = fetchPypi {
inherit pname version;
hash = "sha256-k/2Aqd1QX2mNCGT+k9uLapwRRLX+uRUwggtw7YmCZRw=";
};
})

View File

@ -63,11 +63,10 @@ install_requires = [
# Twisted[conch] also depends on cryptography and Twisted[tls]
# transitively depends on cryptography. So it's anyone's guess what
# version of cryptography will *really* be installed.
"cryptography >= 2.6",
# * cryptography 40 broke constants we need; should really be using them
# * via pyOpenSSL; will be fixed in
# * https://github.com/pyca/pyopenssl/issues/1201
"cryptography >= 2.6, < 40",
# * Used for custom HTTPS validation
"pyOpenSSL >= 23.2.0",
# * The SFTP frontend depends on Twisted 11.0.0 to fix the SSH server
# rekeying bug <https://twistedmatrix.com/trac/ticket/4395>
@ -140,10 +139,10 @@ install_requires = [
"collections-extended >= 2.0.2",
# HTTP server and client
"klein",
# Latest version is necessary to work with latest werkzeug:
"klein >= 23.5.0",
# 2.2.0 has a bug: https://github.com/pallets/werkzeug/issues/2465
# 2.3.x has an incompatibility with Klein: https://github.com/twisted/klein/pull/575
"werkzeug != 2.2.0, < 2.3",
"werkzeug != 2.2.0",
"treq",
"cbor2",
@ -164,10 +163,9 @@ setup_requires = [
]
tor_requires = [
# This is exactly what `foolscap[tor]` means but pip resolves the pair of
# dependencies "foolscap[i2p] foolscap[tor]" to "foolscap[i2p]" so we lose
# this if we don't declare it ourselves!
"txtorcon >= 0.17.0",
# 23.5 added support for custom TLS contexts in web_agent(), which is
# needed for the HTTP storage client to run over Tor.
"txtorcon >= 23.5.0",
]
i2p_requires = [
@ -418,7 +416,7 @@ setup(name="tahoe-lafs", # also set in __init__.py
"subunitreporter==22.2.0",
"python-subunit==1.4.2",
"junitxml==0.7",
"coverage ~= 5.0",
"coverage==7.2.5",
],
# Here are the library dependencies of the test suite.

View File

@ -7,10 +7,9 @@ import os
import stat
import time
import weakref
from typing import Optional
from typing import Optional, Iterable
from base64 import urlsafe_b64encode
from functools import partial
# On Python 2 this will be the backported package:
from configparser import NoSectionError
from foolscap.furl import (
@ -47,7 +46,7 @@ from allmydata.util.encodingutil import get_filesystem_encoding
from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.util.time_format import parse_duration, parse_date
from allmydata.util.i2p_provider import create as create_i2p_provider
from allmydata.util.tor_provider import create as create_tor_provider
from allmydata.util.tor_provider import create as create_tor_provider, _Provider as TorProvider
from allmydata.stats import StatsProvider
from allmydata.history import History
from allmydata.interfaces import (
@ -189,7 +188,7 @@ class Terminator(service.Service):
return service.Service.stopService(self)
def read_config(basedir, portnumfile, generated_files=[]):
def read_config(basedir, portnumfile, generated_files: Iterable=()):
"""
Read and validate configuration for a client-style Node. See
:method:`allmydata.node.read_config` for parameter meanings (the
@ -268,7 +267,7 @@ def create_client_from_config(config, _client_factory=None, _introducer_factory=
introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory)
storage_broker = create_storage_farm_broker(
config, default_connection_handlers, foolscap_connection_handlers,
tub_options, introducer_clients
tub_options, introducer_clients, tor_provider
)
client = _client_factory(
@ -464,7 +463,7 @@ def create_introducer_clients(config, main_tub, _introducer_factory=None):
return introducer_clients
def create_storage_farm_broker(config: _Config, default_connection_handlers, foolscap_connection_handlers, tub_options, introducer_clients):
def create_storage_farm_broker(config: _Config, default_connection_handlers, foolscap_connection_handlers, tub_options, introducer_clients, tor_provider: Optional[TorProvider]):
"""
Create a StorageFarmBroker object, for use by Uploader/Downloader
(and everybody else who wants to use storage servers)
@ -500,6 +499,8 @@ def create_storage_farm_broker(config: _Config, default_connection_handlers, foo
tub_maker=tub_creator,
node_config=config,
storage_client_config=storage_client_config,
default_connection_handlers=default_connection_handlers,
tor_provider=tor_provider,
)
for ic in introducer_clients:
sb.use_introducer(ic)
@ -1103,7 +1104,7 @@ class _Client(node.Node, pollmixin.PollMixin):
# may get an opaque node if there were any problems.
return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
def create_dirnode(self, initial_children={}, version=None):
def create_dirnode(self, initial_children=None, version=None):
d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
return d

View File

@ -678,8 +678,10 @@ class DirectoryNode(object):
return d
# XXX: Too many arguments? Worthwhile to break into mutable/immutable?
def create_subdirectory(self, namex, initial_children={}, overwrite=True,
def create_subdirectory(self, namex, initial_children=None, overwrite=True,
mutable=True, mutable_version=None, metadata=None):
if initial_children is None:
initial_children = {}
name = normalize(namex)
if self.is_readonly():
return defer.fail(NotWriteableError())

View File

@ -1925,7 +1925,11 @@ class FakeTransport(object):
def loseConnection(self):
logmsg("FakeTransport.loseConnection()", level=NOISY)
# getPeer and getHost can just raise errors, since we don't know what to return
def getHost(self):
raise NotImplementedError()
def getPeer(self):
raise NotImplementedError()
@implementer(ISession)
@ -1990,15 +1994,18 @@ class Dispatcher(object):
def __init__(self, client):
self._client = client
def requestAvatar(self, avatarID, mind, interface):
def requestAvatar(self, avatarId, mind, *interfaces):
[interface] = interfaces
_assert(interface == IConchUser, interface=interface)
rootnode = self._client.create_node_from_uri(avatarID.rootcap)
handler = SFTPUserHandler(self._client, rootnode, avatarID.username)
rootnode = self._client.create_node_from_uri(avatarId.rootcap)
handler = SFTPUserHandler(self._client, rootnode, avatarId.username)
return (interface, handler, handler.logout)
class SFTPServer(service.MultiService):
name = "frontend:sftp"
# The type in Twisted for services is wrong in 22.10...
# https://github.com/twisted/twisted/issues/10135
name = "frontend:sftp" # type: ignore[assignment]
def __init__(self, client, accountfile,
sftp_portstr, pubkey_file, privkey_file):

View File

@ -332,7 +332,7 @@ class IncompleteHashTree(CompleteBinaryTreeMixin, list):
name += " (leaf [%d] of %d)" % (leafnum, numleaves)
return name
def set_hashes(self, hashes={}, leaves={}):
def set_hashes(self, hashes=None, leaves=None):
"""Add a bunch of hashes to the tree.
I will validate these to the best of my ability. If I already have a
@ -382,7 +382,10 @@ class IncompleteHashTree(CompleteBinaryTreeMixin, list):
corrupted or one of the received hashes was corrupted. If it raises
NotEnoughHashesError, then the otherhashes dictionary was incomplete.
"""
if hashes is None:
hashes = {}
if leaves is None:
leaves = {}
assert isinstance(hashes, dict)
for h in hashes.values():
assert isinstance(h, bytes)

View File

@ -1391,7 +1391,9 @@ class CHKUploader(object):
def get_upload_status(self):
return self._upload_status
def read_this_many_bytes(uploadable, size, prepend_data=[]):
def read_this_many_bytes(uploadable, size, prepend_data=None):
if prepend_data is None:
prepend_data = []
if size == 0:
return defer.succeed([])
d = uploadable.read(size)
@ -1841,7 +1843,9 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
"""I am a service that allows file uploading. I am a service-child of the
Client.
"""
name = "uploader"
# The type in Twisted for services is wrong in 22.10...
# https://github.com/twisted/twisted/issues/10135
name = "uploader" # type: ignore[assignment]
URI_LIT_SIZE_THRESHOLD = 55
def __init__(self, helper_furl=None, stats_provider=None, history=None):

View File

@ -17,11 +17,13 @@ if PY2:
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, range, max, min # noqa: F401
from past.builtins import long
from typing import Dict
from zope.interface import Interface, Attribute
from twisted.plugin import (
IPlugin,
)
from twisted.internet.defer import Deferred
from foolscap.api import StringConstraint, ListOf, TupleOf, SetOf, DictOf, \
ChoiceOf, IntegerConstraint, Any, RemoteInterface, Referenceable
@ -307,12 +309,15 @@ class RIStorageServer(RemoteInterface):
store that on disk.
"""
# The result of IStorageServer.get_version():
VersionMessage = Dict[bytes, object]
class IStorageServer(Interface):
"""
An object capable of storing shares for a storage client.
"""
def get_version():
def get_version() -> Deferred[VersionMessage]:
"""
:see: ``RIStorageServer.get_version``
"""
@ -493,47 +498,6 @@ class IStorageBroker(Interface):
@return: unicode nickname, or None
"""
# methods moved from IntroducerClient, need review
def get_all_connections():
"""Return a frozenset of (nodeid, service_name, rref) tuples, one for
each active connection we've established to a remote service. This is
mostly useful for unit tests that need to wait until a certain number
of connections have been made."""
def get_all_connectors():
"""Return a dict that maps from (nodeid, service_name) to a
RemoteServiceConnector instance for all services that we are actively
trying to connect to. Each RemoteServiceConnector has the following
public attributes::
service_name: the type of service provided, like 'storage'
last_connect_time: when we last established a connection
last_loss_time: when we last lost a connection
version: the peer's version, from the most recent connection
oldest_supported: the peer's oldest supported version, same
rref: the RemoteReference, if connected, otherwise None
This method is intended for monitoring interfaces, such as a web page
that describes connecting and connected peers.
"""
def get_all_peerids():
"""Return a frozenset of all peerids to whom we have a connection (to
one or more services) established. Mostly useful for unit tests."""
def get_all_connections_for(service_name):
"""Return a frozenset of (nodeid, service_name, rref) tuples, one
for each active connection that provides the given SERVICE_NAME."""
def get_permuted_peers(service_name, key):
"""Returns an ordered list of (peerid, rref) tuples, selecting from
the connections that provide SERVICE_NAME, using a hash-based
permutation keyed by KEY. This randomizes the service list in a
repeatable way, to distribute load over many peers.
"""
class IDisplayableServer(Interface):
def get_nickname():
@ -551,16 +515,6 @@ class IServer(IDisplayableServer):
def start_connecting(trigger_cb):
pass
def get_rref():
"""Obsolete. Use ``get_storage_server`` instead.
Once a server is connected, I return a RemoteReference.
Before a server is connected for the first time, I return None.
Note that the rref I return will start producing DeadReferenceErrors
once the connection is lost.
"""
def upload_permitted():
"""
:return: True if we should use this server for uploads, False
@ -1447,7 +1401,7 @@ class IDirectoryNode(IFilesystemNode):
is a file, or if must_be_file is True and the child is a directory,
I raise ChildOfWrongTypeError."""
def create_subdirectory(name, initial_children={}, overwrite=True,
def create_subdirectory(name, initial_children=None, overwrite=True,
mutable=True, mutable_version=None, metadata=None):
"""I create and attach a directory at the given name. The new
directory can be empty, or it can be populated with children
@ -2586,7 +2540,7 @@ class IClient(Interface):
@return: a Deferred that fires with an IMutableFileNode instance.
"""
def create_dirnode(initial_children={}):
def create_dirnode(initial_children=None):
"""Create a new unattached dirnode, possibly with initial children.
@param initial_children: dict with keys that are unicode child names,
@ -2641,7 +2595,7 @@ class INodeMaker(Interface):
for use by unit tests, to create mutable files that are smaller than
usual."""
def create_new_mutable_directory(initial_children={}):
def create_new_mutable_directory(initial_children=None):
"""I create a new mutable directory, and return a Deferred that will
fire with the IDirectoryNode instance when it is ready. If
initial_children= is provided (a dict mapping unicode child name to

View File

@ -35,7 +35,7 @@ class InvalidCacheError(Exception):
V2 = b"http://allmydata.org/tahoe/protocols/introducer/v2"
@implementer(RIIntroducerSubscriberClient_v2, IIntroducerClient)
@implementer(RIIntroducerSubscriberClient_v2, IIntroducerClient) # type: ignore[misc]
class IntroducerClient(service.Service, Referenceable):
def __init__(self, tub, introducer_furl,

View File

@ -142,9 +142,12 @@ def stringify_remote_address(rref):
return str(remote)
# MyPy doesn't work well with remote interfaces...
@implementer(RIIntroducerPublisherAndSubscriberService_v2)
class IntroducerService(service.MultiService, Referenceable):
name = "introducer"
class IntroducerService(service.MultiService, Referenceable): # type: ignore[misc]
# The type in Twisted for services is wrong in 22.10...
# https://github.com/twisted/twisted/issues/10135
name = "introducer" # type: ignore[assignment]
# v1 is the original protocol, added in 1.0 (but only advertised starting
# in 1.3), removed in 1.12. v2 is the new signed protocol, added in 1.10
# TODO: reconcile bytes/str for keys

View File

@ -17,7 +17,7 @@ import errno
from base64 import b32decode, b32encode
from errno import ENOENT, EPERM
from warnings import warn
from typing import Union
from typing import Union, Iterable
import attr
@ -172,7 +172,7 @@ def create_node_dir(basedir, readme_text):
f.write(readme_text)
def read_config(basedir, portnumfile, generated_files=[], _valid_config=None):
def read_config(basedir, portnumfile, generated_files: Iterable = (), _valid_config=None):
"""
Read and validate configuration.
@ -741,7 +741,7 @@ def create_connection_handlers(config, i2p_provider, tor_provider):
def create_tub(tub_options, default_connection_handlers, foolscap_connection_handlers,
handler_overrides={}, force_foolscap=False, **kwargs):
handler_overrides=None, force_foolscap=False, **kwargs):
"""
Create a Tub with the right options and handlers. It will be
ephemeral unless the caller provides certFile= in kwargs
@ -755,6 +755,8 @@ def create_tub(tub_options, default_connection_handlers, foolscap_connection_han
:param bool force_foolscap: If True, only allow Foolscap, not just HTTPS
storage protocol.
"""
if handler_overrides is None:
handler_overrides = {}
# We listen simultaneously for both Foolscap and HTTPS on the same port,
# so we have to create a special Foolscap Tub for that to work:
if force_foolscap:
@ -922,7 +924,7 @@ def tub_listen_on(i2p_provider, tor_provider, tub, tubport, location):
def create_main_tub(config, tub_options,
default_connection_handlers, foolscap_connection_handlers,
i2p_provider, tor_provider,
handler_overrides={}, cert_filename="node.pem"):
handler_overrides=None, cert_filename="node.pem"):
"""
Creates a 'main' Foolscap Tub, typically for use as the top-level
access point for a running Node.
@ -943,6 +945,8 @@ def create_main_tub(config, tub_options,
:param tor_provider: None, or a _Provider instance if txtorcon +
Tor are installed.
"""
if handler_overrides is None:
handler_overrides = {}
portlocation = _tub_portlocation(
config,
iputil.get_local_addresses_sync,

View File

@ -135,8 +135,9 @@ class NodeMaker(object):
d.addCallback(lambda res: n)
return d
def create_new_mutable_directory(self, initial_children={}, version=None):
# initial_children must have metadata (i.e. {} instead of None)
def create_new_mutable_directory(self, initial_children=None, version=None):
if initial_children is None:
initial_children = {}
for (name, (node, metadata)) in initial_children.items():
precondition(isinstance(metadata, dict),
"create_new_mutable_directory requires metadata to be a dict, not None", metadata)

View File

@ -16,9 +16,10 @@ later in the configuration process.
from __future__ import annotations
from itertools import chain
from typing import cast
from twisted.internet.protocol import Protocol
from twisted.internet.interfaces import IDelayedCall
from twisted.internet.interfaces import IDelayedCall, IReactorFromThreads
from twisted.internet.ssl import CertificateOptions
from twisted.web.server import Site
from twisted.protocols.tls import TLSMemoryBIOFactory
@ -89,7 +90,7 @@ class _FoolscapOrHttps(Protocol, metaclass=_PretendToBeNegotiation):
certificate=cls.tub.myCertificate.original,
)
http_storage_server = HTTPServer(reactor, storage_server, swissnum)
http_storage_server = HTTPServer(cast(IReactorFromThreads, reactor), storage_server, swissnum)
cls.https_factory = TLSMemoryBIOFactory(
certificate_options,
False,
@ -102,8 +103,15 @@ class _FoolscapOrHttps(Protocol, metaclass=_PretendToBeNegotiation):
for location_hint in chain.from_iterable(
hints.split(",") for hints in cls.tub.locationHints
):
if location_hint.startswith("tcp:"):
_, hostname, port = location_hint.split(":")
if location_hint.startswith("tcp:") or location_hint.startswith("tor:"):
scheme, hostname, port = location_hint.split(":")
if scheme == "tcp":
subscheme = None
else:
subscheme = "tor"
# If we're listening on Tor, the hostname needs to have an
# .onion TLD.
assert hostname.endswith(".onion")
port = int(port)
storage_nurls.add(
build_nurl(
@ -111,9 +119,10 @@ class _FoolscapOrHttps(Protocol, metaclass=_PretendToBeNegotiation):
port,
str(swissnum, "ascii"),
cls.tub.myCertificate.original.to_cryptography(),
subscheme
)
)
# TODO this is probably where we'll have to support Tor and I2P?
# TODO this is where we'll have to support Tor and I2P as well.
# See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3888#comment:9
# for discussion (there will be separate tickets added for those at
# some point.)

View File

@ -112,6 +112,9 @@ class AddGridManagerCertOptions(BaseOptions):
return "Usage: tahoe [global-options] admin add-grid-manager-cert [options]"
def postOptions(self) -> None:
assert self.parent is not None
assert self.parent.parent is not None
if self['name'] is None:
raise usage.UsageError(
"Must provide --name option"
@ -123,8 +126,8 @@ class AddGridManagerCertOptions(BaseOptions):
data: str
if self['filename'] == '-':
print("reading certificate from stdin", file=self.parent.parent.stderr)
data = self.parent.parent.stdin.read()
print("reading certificate from stdin", file=self.parent.parent.stderr) # type: ignore[attr-defined]
data = self.parent.parent.stdin.read() # type: ignore[attr-defined]
if len(data) == 0:
raise usage.UsageError(
"Reading certificate from stdin failed"

View File

@ -104,6 +104,11 @@ class RunOptions(BasedirOptions):
" [default: %s]" % quote_local_unicode_path(_default_nodedir)),
]
optFlags = [
("allow-stdin-close", None,
'Do not exit when stdin closes ("tahoe run" otherwise will exit).'),
]
def parseArgs(self, basedir=None, *twistd_args):
# This can't handle e.g. 'tahoe run --reactor=foo', since
# '--reactor=foo' looks like an option to the tahoe subcommand, not to
@ -156,6 +161,7 @@ class DaemonizeTheRealService(Service, HookMixin):
"running": None,
}
self.stderr = options.parent.stderr
self._close_on_stdin_close = False if options["allow-stdin-close"] else True
def startService(self):
@ -199,10 +205,12 @@ class DaemonizeTheRealService(Service, HookMixin):
d = service_factory()
def created(srv):
srv.setServiceParent(self.parent)
if self.parent is not None:
srv.setServiceParent(self.parent)
# exiting on stdin-closed facilitates cleanup when run
# as a subprocess
on_stdin_close(reactor, reactor.stop)
if self._close_on_stdin_close:
on_stdin_close(reactor, reactor.stop)
d.addCallback(created)
d.addErrback(handle_config_error)
d.addBoth(self._call_hook, 'running')
@ -213,11 +221,13 @@ class DaemonizeTheRealService(Service, HookMixin):
class DaemonizeTahoeNodePlugin(object):
tapname = "tahoenode"
def __init__(self, nodetype, basedir):
def __init__(self, nodetype, basedir, allow_stdin_close):
self.nodetype = nodetype
self.basedir = basedir
self.allow_stdin_close = allow_stdin_close
def makeService(self, so):
so["allow-stdin-close"] = self.allow_stdin_close
return DaemonizeTheRealService(self.nodetype, self.basedir, so)
@ -304,7 +314,9 @@ def run(reactor, config, runApp=twistd.runApp):
print(config, file=err)
print("tahoe %s: usage error from twistd: %s\n" % (config.subcommand_name, ue), file=err)
return 1
twistd_config.loadedPlugins = {"DaemonizeTahoeNode": DaemonizeTahoeNodePlugin(nodetype, basedir)}
twistd_config.loadedPlugins = {
"DaemonizeTahoeNode": DaemonizeTahoeNodePlugin(nodetype, basedir, config["allow-stdin-close"])
}
# our own pid-style file contains PID and process creation time
pidfile = FilePath(get_pidfile(config['basedir']))

View File

@ -39,6 +39,10 @@ def si_b2a(storageindex):
def si_a2b(ascii_storageindex):
return base32.a2b(ascii_storageindex)
def si_to_human_readable(storageindex: bytes) -> str:
"""Create human-readable string of storage index."""
return str(base32.b2a(storageindex), "ascii")
def storage_index_to_dir(storageindex):
"""Convert storage index to directory path.

View File

@ -4,13 +4,27 @@ HTTP client that talks to the HTTP storage server.
from __future__ import annotations
from eliot import start_action, register_exception_extractor
from typing import Union, Optional, Sequence, Mapping, BinaryIO, cast, TypedDict, Set
from typing import (
Union,
Optional,
Sequence,
Mapping,
BinaryIO,
cast,
TypedDict,
Set,
Dict,
Callable,
ClassVar,
)
from base64 import b64encode
from io import BytesIO
from os import SEEK_END
from attrs import define, asdict, frozen, field
from eliot import start_action, register_exception_extractor
from eliot.twisted import DeferredContext
# TODO Make sure to import Python version?
from cbor2 import loads, dumps
@ -19,8 +33,8 @@ from collections_extended import RangeMap
from werkzeug.datastructures import Range, ContentRange
from twisted.web.http_headers import Headers
from twisted.web import http
from twisted.web.iweb import IPolicyForHTTPS, IResponse
from twisted.internet.defer import inlineCallbacks, Deferred, succeed
from twisted.web.iweb import IPolicyForHTTPS, IResponse, IAgent
from twisted.internet.defer import Deferred, succeed
from twisted.internet.interfaces import (
IOpenSSLClientConnectionCreator,
IReactorTime,
@ -34,7 +48,6 @@ import treq
from treq.client import HTTPClient
from treq.testing import StubTreq
from OpenSSL import SSL
from cryptography.hazmat.bindings.openssl.binding import Binding
from werkzeug.http import parse_content_range_header
from .http_common import (
@ -43,12 +56,20 @@ from .http_common import (
get_content_type,
CBOR_MIME_TYPE,
get_spki_hash,
response_is_not_html,
)
from .common import si_b2a
from ..interfaces import VersionMessage
from .common import si_b2a, si_to_human_readable
from ..util.hashutil import timing_safe_compare
from ..util.deferredutil import async_to_deferred
from ..util.tor_provider import _Provider as TorProvider
_OPENSSL = Binding().lib
try:
from txtorcon import Tor # type: ignore
except ImportError:
class Tor: # type: ignore[no-redef]
pass
def _encode_si(si): # type: (bytes) -> str
@ -159,15 +180,24 @@ def limited_content(
This will time out if no data is received for 60 seconds; so long as a
trickle of data continues to arrive, it will continue to run.
"""
d = succeed(None)
timeout = clock.callLater(60, d.cancel)
result_deferred = succeed(None)
# Sadly, addTimeout() won't work because we need access to the IDelayedCall
# in order to reset it on each data chunk received.
timeout = clock.callLater(60, result_deferred.cancel)
collector = _LengthLimitedCollector(max_length, timeout)
with start_action(
action_type="allmydata:storage:http-client:limited-content",
max_length=max_length,
).context():
d = DeferredContext(result_deferred)
# Make really sure everything gets called in Deferred context, treq might
# call collector directly...
d.addCallback(lambda _: treq.collect(response, collector))
def done(_):
def done(_: object) -> BytesIO:
timeout.cancel()
collector.f.seek(0)
return collector.f
@ -177,7 +207,8 @@ def limited_content(
timeout.cancel()
return f
return d.addCallbacks(done, failed)
result = d.addCallbacks(done, failed)
return result.addActionFinish()
@define
@ -234,11 +265,11 @@ class _TLSContextFactory(CertificateOptions):
# not the usual TLS concerns about invalid CAs or revoked
# certificates.
things_are_ok = (
_OPENSSL.X509_V_OK,
_OPENSSL.X509_V_ERR_CERT_NOT_YET_VALID,
_OPENSSL.X509_V_ERR_CERT_HAS_EXPIRED,
_OPENSSL.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
_OPENSSL.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN,
SSL.X509VerificationCodes.OK,
SSL.X509VerificationCodes.ERR_CERT_NOT_YET_VALID,
SSL.X509VerificationCodes.ERR_CERT_HAS_EXPIRED,
SSL.X509VerificationCodes.ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
SSL.X509VerificationCodes.ERR_SELF_SIGNED_CERT_IN_CHAIN,
)
# TODO can we do this once instead of multiple times?
if errno in things_are_ok and timing_safe_compare(
@ -279,18 +310,30 @@ class _StorageClientHTTPSPolicy:
)
@define(hash=True)
class StorageClient(object):
@define
class StorageClientFactory:
"""
Low-level HTTP client that talks to the HTTP storage server.
Create ``StorageClient`` instances, using appropriate
``twisted.web.iweb.IAgent`` for different connection methods: normal TCP,
Tor, and eventually I2P.
There is some caching involved since there might be shared setup work, e.g.
connecting to the local Tor service only needs to happen once.
"""
# If set, we're doing unit testing and we should call this with
# HTTPConnectionPool we create.
TEST_MODE_REGISTER_HTTP_POOL = None
_default_connection_handlers: dict[str, str]
_tor_provider: Optional[TorProvider]
# Cache the Tor instance created by the provider, if relevant.
_tor_instance: Optional[Tor] = None
# If set, we're doing unit testing and we should call this with any
# HTTPConnectionPool that gets passed/created to ``create_agent()``.
TEST_MODE_REGISTER_HTTP_POOL: ClassVar[
Optional[Callable[[HTTPConnectionPool], None]]
] = None
@classmethod
def start_test_mode(cls, callback):
def start_test_mode(cls, callback: Callable[[HTTPConnectionPool], None]) -> None:
"""Switch to testing mode.
In testing mode we register the pool with test system using the given
@ -305,41 +348,88 @@ class StorageClient(object):
"""Stop testing mode."""
cls.TEST_MODE_REGISTER_HTTP_POOL = None
# The URL is a HTTPS URL ("https://..."). To construct from a NURL, use
# ``StorageClient.from_nurl()``.
async def _create_agent(
self,
nurl: DecodedURL,
reactor: object,
tls_context_factory: IPolicyForHTTPS,
pool: HTTPConnectionPool,
) -> IAgent:
"""Create a new ``IAgent``, possibly using Tor."""
if self.TEST_MODE_REGISTER_HTTP_POOL is not None:
self.TEST_MODE_REGISTER_HTTP_POOL(pool)
# TODO default_connection_handlers should really be an object, not a
# dict, so we can ask "is this using Tor" without poking at a
# dictionary with arbitrary strings... See
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4032
handler = self._default_connection_handlers["tcp"]
if handler == "tcp":
return Agent(reactor, tls_context_factory, pool=pool)
if handler == "tor" or nurl.scheme == "pb+tor":
assert self._tor_provider is not None
if self._tor_instance is None:
self._tor_instance = await self._tor_provider.get_tor_instance(reactor)
return self._tor_instance.web_agent(
pool=pool, tls_context_factory=tls_context_factory
)
else:
raise RuntimeError(f"Unsupported tcp connection handler: {handler}")
async def create_storage_client(
self,
nurl: DecodedURL,
reactor: IReactorTime,
pool: Optional[HTTPConnectionPool] = None,
) -> StorageClient:
"""Create a new ``StorageClient`` for the given NURL."""
assert nurl.fragment == "v=1"
assert nurl.scheme in ("pb", "pb+tor")
if pool is None:
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 10
certificate_hash = nurl.user.encode("ascii")
agent = await self._create_agent(
nurl,
reactor,
_StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash),
pool,
)
treq_client = HTTPClient(agent)
https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port)
swissnum = nurl.path[0].encode("ascii")
response_check = lambda _: None
if self.TEST_MODE_REGISTER_HTTP_POOL is not None:
response_check = response_is_not_html
return StorageClient(
https_url,
swissnum,
treq_client,
pool,
reactor,
response_check,
)
@define(hash=True)
class StorageClient(object):
"""
Low-level HTTP client that talks to the HTTP storage server.
Create using a ``StorageClientFactory`` instance.
"""
# The URL should be a HTTPS URL ("https://...")
_base_url: DecodedURL
_swissnum: bytes
_treq: Union[treq, StubTreq, HTTPClient]
_pool: HTTPConnectionPool
_clock: IReactorTime
@classmethod
def from_nurl(
cls, nurl: DecodedURL, reactor, pool: Optional[HTTPConnectionPool] = None
) -> StorageClient:
"""
Create a ``StorageClient`` for the given NURL.
"""
assert nurl.fragment == "v=1"
assert nurl.scheme == "pb"
swissnum = nurl.path[0].encode("ascii")
certificate_hash = nurl.user.encode("ascii")
if pool is None:
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 20
if cls.TEST_MODE_REGISTER_HTTP_POOL is not None:
cls.TEST_MODE_REGISTER_HTTP_POOL(pool)
treq_client = HTTPClient(
Agent(
reactor,
_StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash),
pool=pool,
)
)
https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port)
return cls(https_url, swissnum, treq_client, reactor)
# Are we running unit tests?
_analyze_response: Callable[[IResponse], None] = lambda _: None
def relative_url(self, path: str) -> DecodedURL:
"""Get a URL relative to the base URL."""
@ -443,11 +533,14 @@ class StorageClient(object):
kwargs["data"] = dumps(message_to_serialize)
headers.addRawHeader("Content-Type", CBOR_MIME_TYPE)
return await self._treq.request(
response = await self._treq.request(
method, url, headers=headers, timeout=timeout, **kwargs
)
self._analyze_response(response)
async def decode_cbor(self, response, schema: Schema) -> object:
return response
async def decode_cbor(self, response: IResponse, schema: Schema) -> object:
"""Given HTTP response, return decoded CBOR body."""
with start_action(action_type="allmydata:storage:http-client:decode-cbor"):
if response.code > 199 and response.code < 300:
@ -470,6 +563,10 @@ class StorageClient(object):
).read()
raise ClientException(response.code, response.phrase, data)
def shutdown(self) -> Deferred:
"""Shutdown any connections."""
return self._pool.closeCachedConnections()
@define(hash=True)
class StorageClientGeneral(object):
@ -480,21 +577,28 @@ class StorageClientGeneral(object):
_client: StorageClient
@async_to_deferred
async def get_version(self):
async def get_version(self) -> VersionMessage:
"""
Return the version metadata for the server.
"""
with start_action(
action_type="allmydata:storage:http-client:get-version",
):
return await self._get_version()
async def _get_version(self) -> VersionMessage:
"""Implementation of get_version()."""
url = self._client.relative_url("/storage/v1/version")
response = await self._client.request("GET", url)
decoded_response = cast(
Mapping[bytes, object],
Dict[bytes, object],
await self._client.decode_cbor(response, _SCHEMAS["get_version"]),
)
# Add some features we know are true because the HTTP API
# specification requires them and because other parts of the storage
# client implementation assumes they will be present.
cast(
Mapping[bytes, object],
Dict[bytes, object],
decoded_response[b"http://allmydata.org/tahoe/protocols/storage/v1"],
).update(
{
@ -506,20 +610,31 @@ class StorageClientGeneral(object):
)
return decoded_response
@inlineCallbacks
def add_or_renew_lease(
@async_to_deferred
async def add_or_renew_lease(
self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes
) -> Deferred[None]:
) -> None:
"""
Add or renew a lease.
If the renewal secret matches an existing lease, it is renewed.
Otherwise a new lease is added.
"""
with start_action(
action_type="allmydata:storage:http-client:add-or-renew-lease",
storage_index=si_to_human_readable(storage_index),
):
return await self._add_or_renew_lease(
storage_index, renew_secret, cancel_secret
)
async def _add_or_renew_lease(
self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes
) -> None:
url = self._client.relative_url(
"/storage/v1/lease/{}".format(_encode_si(storage_index))
)
response = yield self._client.request(
response = await self._client.request(
"PUT",
url,
lease_renew_secret=renew_secret,
@ -544,15 +659,15 @@ class UploadProgress(object):
required: RangeMap
@inlineCallbacks
def read_share_chunk(
@async_to_deferred
async def read_share_chunk(
client: StorageClient,
share_type: str,
storage_index: bytes,
share_number: int,
offset: int,
length: int,
) -> Deferred[bytes]:
) -> bytes:
"""
Download a chunk of data from a share.
@ -573,7 +688,7 @@ def read_share_chunk(
# The default 60 second timeout is for getting the response, so it doesn't
# include the time it takes to download the body... so we will will deal
# with that later, via limited_content().
response = yield client.request(
response = await client.request(
"GET",
url,
headers=Headers(
@ -587,6 +702,12 @@ def read_share_chunk(
if response.code == http.NO_CONTENT:
return b""
content_type = get_content_type(response.headers)
if content_type != "application/octet-stream":
raise ValueError(
f"Content-type was wrong: {content_type}, should be application/octet-stream"
)
if response.code == http.PARTIAL_CONTENT:
content_range = parse_content_range_header(
response.headers.getRawHeaders("content-range")[0] or ""
@ -604,7 +725,7 @@ def read_share_chunk(
raise ValueError("Server sent more than we asked for?!")
# It might also send less than we asked for. That's (probably) OK, e.g.
# if we went past the end of the file.
body = yield limited_content(response, client._clock, supposed_length)
body = await limited_content(response, client._clock, supposed_length)
body.seek(0, SEEK_END)
actual_length = body.tell()
if actual_length != supposed_length:
@ -630,7 +751,7 @@ async def advise_corrupt_share(
storage_index: bytes,
share_number: int,
reason: str,
):
) -> None:
assert isinstance(reason, str)
url = client.relative_url(
"/storage/v1/{}/{}/{}/corrupt".format(
@ -678,6 +799,35 @@ class StorageClientImmutables(object):
Result fires when creating the storage index succeeded, if creating the
storage index failed the result will fire with an exception.
"""
with start_action(
action_type="allmydata:storage:http-client:immutable:create",
storage_index=si_to_human_readable(storage_index),
share_numbers=share_numbers,
allocated_size=allocated_size,
) as ctx:
result = await self._create(
storage_index,
share_numbers,
allocated_size,
upload_secret,
lease_renew_secret,
lease_cancel_secret,
)
ctx.add_success_fields(
already_have=result.already_have, allocated=result.allocated
)
return result
async def _create(
self,
storage_index: bytes,
share_numbers: set[int],
allocated_size: int,
upload_secret: bytes,
lease_renew_secret: bytes,
lease_cancel_secret: bytes,
) -> ImmutableCreateResult:
"""Implementation of create()."""
url = self._client.relative_url(
"/storage/v1/immutable/" + _encode_si(storage_index)
)
@ -700,17 +850,28 @@ class StorageClientImmutables(object):
allocated=decoded_response["allocated"],
)
@inlineCallbacks
def abort_upload(
@async_to_deferred
async def abort_upload(
self, storage_index: bytes, share_number: int, upload_secret: bytes
) -> Deferred[None]:
) -> None:
"""Abort the upload."""
with start_action(
action_type="allmydata:storage:http-client:immutable:abort-upload",
storage_index=si_to_human_readable(storage_index),
share_number=share_number,
):
return await self._abort_upload(storage_index, share_number, upload_secret)
async def _abort_upload(
self, storage_index: bytes, share_number: int, upload_secret: bytes
) -> None:
"""Implementation of ``abort_upload()``."""
url = self._client.relative_url(
"/storage/v1/immutable/{}/{}/abort".format(
_encode_si(storage_index), share_number
)
)
response = yield self._client.request(
response = await self._client.request(
"PUT",
url,
upload_secret=upload_secret,
@ -744,6 +905,28 @@ class StorageClientImmutables(object):
whether the _complete_ share (i.e. all chunks, not just this one) has
been uploaded.
"""
with start_action(
action_type="allmydata:storage:http-client:immutable:write-share-chunk",
storage_index=si_to_human_readable(storage_index),
share_number=share_number,
offset=offset,
data_len=len(data),
) as ctx:
result = await self._write_share_chunk(
storage_index, share_number, upload_secret, offset, data
)
ctx.add_success_fields(finished=result.finished)
return result
async def _write_share_chunk(
self,
storage_index: bytes,
share_number: int,
upload_secret: bytes,
offset: int,
data: bytes,
) -> UploadProgress:
"""Implementation of ``write_share_chunk()``."""
url = self._client.relative_url(
"/storage/v1/immutable/{}/{}".format(
_encode_si(storage_index), share_number
@ -784,21 +967,41 @@ class StorageClientImmutables(object):
remaining.set(True, chunk["begin"], chunk["end"])
return UploadProgress(finished=finished, required=remaining)
def read_share_chunk(
self, storage_index, share_number, offset, length
): # type: (bytes, int, int, int) -> Deferred[bytes]
@async_to_deferred
async def read_share_chunk(
self, storage_index: bytes, share_number: int, offset: int, length: int
) -> bytes:
"""
Download a chunk of data from a share.
"""
return read_share_chunk(
self._client, "immutable", storage_index, share_number, offset, length
)
with start_action(
action_type="allmydata:storage:http-client:immutable:read-share-chunk",
storage_index=si_to_human_readable(storage_index),
share_number=share_number,
offset=offset,
length=length,
) as ctx:
result = await read_share_chunk(
self._client, "immutable", storage_index, share_number, offset, length
)
ctx.add_success_fields(data_len=len(result))
return result
@async_to_deferred
async def list_shares(self, storage_index: bytes) -> Set[int]:
"""
Return the set of shares for a given storage index.
"""
with start_action(
action_type="allmydata:storage:http-client:immutable:list-shares",
storage_index=si_to_human_readable(storage_index),
) as ctx:
result = await self._list_shares(storage_index)
ctx.add_success_fields(shares=result)
return result
async def _list_shares(self, storage_index: bytes) -> Set[int]:
"""Implementation of ``list_shares()``."""
url = self._client.relative_url(
"/storage/v1/immutable/{}/shares".format(_encode_si(storage_index))
)
@ -814,16 +1017,23 @@ class StorageClientImmutables(object):
else:
raise ClientException(response.code)
def advise_corrupt_share(
@async_to_deferred
async def advise_corrupt_share(
self,
storage_index: bytes,
share_number: int,
reason: str,
):
) -> None:
"""Indicate a share has been corrupted, with a human-readable message."""
return advise_corrupt_share(
self._client, "immutable", storage_index, share_number, reason
)
with start_action(
action_type="allmydata:storage:http-client:immutable:advise-corrupt-share",
storage_index=si_to_human_readable(storage_index),
share_number=share_number,
reason=reason,
):
await advise_corrupt_share(
self._client, "immutable", storage_index, share_number, reason
)
@frozen
@ -914,6 +1124,29 @@ class StorageClientMutables:
Given a mapping between share numbers and test/write vectors, the tests
are done and if they are valid the writes are done.
"""
with start_action(
action_type="allmydata:storage:http-client:mutable:read-test-write",
storage_index=si_to_human_readable(storage_index),
):
return await self._read_test_write_chunks(
storage_index,
write_enabler_secret,
lease_renew_secret,
lease_cancel_secret,
testwrite_vectors,
read_vector,
)
async def _read_test_write_chunks(
self,
storage_index: bytes,
write_enabler_secret: bytes,
lease_renew_secret: bytes,
lease_cancel_secret: bytes,
testwrite_vectors: dict[int, TestWriteVectors],
read_vector: list[ReadVector],
) -> ReadTestWriteResult:
"""Implementation of ``read_test_write_chunks()``."""
url = self._client.relative_url(
"/storage/v1/mutable/{}/read-test-write".format(_encode_si(storage_index))
)
@ -943,25 +1176,45 @@ class StorageClientMutables:
else:
raise ClientException(response.code, (await response.content()))
def read_share_chunk(
@async_to_deferred
async def read_share_chunk(
self,
storage_index: bytes,
share_number: int,
offset: int,
length: int,
) -> Deferred[bytes]:
) -> bytes:
"""
Download a chunk of data from a share.
"""
return read_share_chunk(
self._client, "mutable", storage_index, share_number, offset, length
)
with start_action(
action_type="allmydata:storage:http-client:mutable:read-share-chunk",
storage_index=si_to_human_readable(storage_index),
share_number=share_number,
offset=offset,
length=length,
) as ctx:
result = await read_share_chunk(
self._client, "mutable", storage_index, share_number, offset, length
)
ctx.add_success_fields(data_len=len(result))
return result
@async_to_deferred
async def list_shares(self, storage_index: bytes) -> Set[int]:
"""
List the share numbers for a given storage index.
"""
with start_action(
action_type="allmydata:storage:http-client:mutable:list-shares",
storage_index=si_to_human_readable(storage_index),
) as ctx:
result = await self._list_shares(storage_index)
ctx.add_success_fields(shares=result)
return result
async def _list_shares(self, storage_index: bytes) -> Set[int]:
"""Implementation of ``list_shares()``."""
url = self._client.relative_url(
"/storage/v1/mutable/{}/shares".format(_encode_si(storage_index))
)
@ -976,13 +1229,20 @@ class StorageClientMutables:
else:
raise ClientException(response.code)
def advise_corrupt_share(
@async_to_deferred
async def advise_corrupt_share(
self,
storage_index: bytes,
share_number: int,
reason: str,
):
) -> None:
"""Indicate a share has been corrupted, with a human-readable message."""
return advise_corrupt_share(
self._client, "mutable", storage_index, share_number, reason
)
with start_action(
action_type="allmydata:storage:http-client:mutable:advise-corrupt-share",
storage_index=si_to_human_readable(storage_index),
share_number=share_number,
reason=reason,
):
await advise_corrupt_share(
self._client, "mutable", storage_index, share_number, reason
)

View File

@ -12,6 +12,7 @@ from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from werkzeug.http import parse_options_header
from twisted.web.http_headers import Headers
from twisted.web.iweb import IResponse
CBOR_MIME_TYPE = "application/cbor"
@ -22,11 +23,23 @@ def get_content_type(headers: Headers) -> Optional[str]:
Returns ``None`` if no content-type was set.
"""
values = headers.getRawHeaders("content-type") or [None]
values = headers.getRawHeaders("content-type", [None]) or [None]
content_type = parse_options_header(values[0])[0] or None
return content_type
def response_is_not_html(response: IResponse) -> None:
"""
During tests, this is registered so we can ensure the web server
doesn't give us text/html.
HTML is never correct except in 404, but it's the default for
Twisted's web server so we assert nothing unexpected happened.
"""
if response.code != 404:
assert get_content_type(response.headers) != "text/html"
def swissnum_auth_header(swissnum: bytes) -> bytes:
"""Return value for ``Authorization`` header."""
return b"Tahoe-LAFS " + b64encode(swissnum).strip()

View File

@ -4,7 +4,7 @@ HTTP server for storage.
from __future__ import annotations
from typing import Any, Callable, Union, cast
from typing import Any, Callable, Union, cast, Optional
from functools import wraps
from base64 import b64decode
import binascii
@ -75,7 +75,7 @@ def _extract_secrets(
secrets, return dictionary mapping secrets to decoded values.
If too few secrets were given, or too many, a ``ClientSecretsException`` is
raised.
raised; its text is sent in the HTTP response.
"""
string_key_to_enum = {e.value: e for e in Secrets}
result = {}
@ -84,6 +84,10 @@ def _extract_secrets(
string_key, string_value = header_value.strip().split(" ", 1)
key = string_key_to_enum[string_key]
value = b64decode(string_value)
if value == b"":
raise ClientSecretsException(
"Failed to decode secret {}".format(string_key)
)
if key in (Secrets.LEASE_CANCEL, Secrets.LEASE_RENEW) and len(value) != 32:
raise ClientSecretsException("Lease secrets must be 32 bytes long")
result[key] = value
@ -91,7 +95,9 @@ def _extract_secrets(
raise ClientSecretsException("Bad header value(s): {}".format(header_values))
if result.keys() != required_secrets:
raise ClientSecretsException(
"Expected {} secrets, got {}".format(required_secrets, result.keys())
"Expected {} in X-Tahoe-Authorization headers, got {}".format(
[r.value for r in required_secrets], list(result.keys())
)
)
return result
@ -106,6 +112,9 @@ def _authorization_decorator(required_secrets):
def decorator(f):
@wraps(f)
def route(self, request, *args, **kwargs):
# Don't set text/html content type by default:
request.defaultContentType = None
with start_action(
action_type="allmydata:storage:http-server:handle-request",
method=request.method,
@ -113,13 +122,19 @@ def _authorization_decorator(required_secrets):
) as ctx:
try:
# Check Authorization header:
try:
auth_header = request.requestHeaders.getRawHeaders(
"Authorization", [""]
)[0].encode("utf-8")
except UnicodeError:
raise _HTTPError(http.BAD_REQUEST, "Bad Authorization header")
if not timing_safe_compare(
request.requestHeaders.getRawHeaders("Authorization", [""])[0].encode(
"utf-8"
),
auth_header,
swissnum_auth_header(self._swissnum),
):
raise _HTTPError(http.UNAUTHORIZED)
raise _HTTPError(
http.UNAUTHORIZED, "Wrong Authorization header"
)
# Check secrets:
authorization = request.requestHeaders.getRawHeaders(
@ -127,8 +142,8 @@ def _authorization_decorator(required_secrets):
)
try:
secrets = _extract_secrets(authorization, required_secrets)
except ClientSecretsException:
raise _HTTPError(http.BAD_REQUEST)
except ClientSecretsException as e:
raise _HTTPError(http.BAD_REQUEST, str(e))
# Run the business logic:
result = f(self, request, secrets, *args, **kwargs)
@ -269,8 +284,10 @@ class _HTTPError(Exception):
Raise from ``HTTPServer`` endpoint to return the given HTTP response code.
"""
def __init__(self, code: int):
def __init__(self, code: int, body: Optional[str] = None):
Exception.__init__(self, (code, body))
self.code = code
self.body = body
# CDDL schemas.
@ -369,13 +386,16 @@ class _ReadRangeProducer:
a request.
"""
request: Request
request: Optional[Request]
read_data: ReadData
result: Deferred
result: Optional[Deferred[bytes]]
start: int
remaining: int
def resumeProducing(self):
if self.result is None or self.request is None:
return
to_read = min(self.remaining, 65536)
data = self.read_data(self.start, to_read)
assert len(data) <= to_read
@ -424,7 +444,7 @@ class _ReadRangeProducer:
def read_range(
request: Request, read_data: ReadData, share_length: int
) -> Union[Deferred, bytes]:
) -> Union[Deferred[bytes], bytes]:
"""
Read an optional ``Range`` header, reads data appropriately via the given
callable, writes the data to the request.
@ -461,6 +481,8 @@ def read_range(
raise _HTTPError(http.REQUESTED_RANGE_NOT_SATISFIABLE)
offset, end = range_header.ranges[0]
assert end is not None # should've exited in block above this if so
# If we're being ask to read beyond the length of the share, just read
# less:
end = min(end, share_length)
@ -479,7 +501,7 @@ def read_range(
ContentRange("bytes", offset, end).to_header(),
)
d = Deferred()
d: Deferred[bytes] = Deferred()
request.registerProducer(
_ReadRangeProducer(
request, read_data_with_error_handling, d, offset, end - offset
@ -491,11 +513,15 @@ def read_range(
def _add_error_handling(app: Klein):
"""Add exception handlers to a Klein app."""
@app.handle_errors(_HTTPError)
def _http_error(_, request, failure):
"""Handle ``_HTTPError`` exceptions."""
request.setResponseCode(failure.value.code)
return b""
if failure.value.body is not None:
return failure.value.body
else:
return b""
@app.handle_errors(CDDLValidationError)
def _cddl_validation_error(_, request, failure):
@ -775,6 +801,7 @@ class HTTPServer(object):
)
def read_share_chunk(self, request, authorization, storage_index, share_number):
"""Read a chunk for an already uploaded immutable."""
request.setHeader("content-type", "application/octet-stream")
try:
bucket = self._storage_server.get_buckets(storage_index)[share_number]
except KeyError:
@ -880,6 +907,7 @@ class HTTPServer(object):
)
def read_mutable_chunk(self, request, authorization, storage_index, share_number):
"""Read a chunk from a mutable."""
request.setHeader("content-type", "application/octet-stream")
try:
share_length = self._storage_server.get_mutable_share_length(
@ -972,13 +1000,20 @@ class _TLSEndpointWrapper(object):
def build_nurl(
hostname: str, port: int, swissnum: str, certificate: CryptoCertificate
hostname: str,
port: int,
swissnum: str,
certificate: CryptoCertificate,
subscheme: Optional[str] = None,
) -> DecodedURL:
"""
Construct a HTTPS NURL, given the hostname, port, server swissnum, and x509
certificate for the server. Clients can then connect to the server using
this NURL.
"""
scheme = "pb"
if subscheme is not None:
scheme = f"{scheme}+{subscheme}"
return DecodedURL().replace(
fragment="v=1", # how we know this NURL is HTTP-based (i.e. not Foolscap)
host=hostname,
@ -990,7 +1025,7 @@ def build_nurl(
"ascii",
),
),
scheme="pb",
scheme=scheme,
)

View File

@ -173,7 +173,9 @@ class LeaseInfo(object):
"""
return attr.assoc(
self,
_expiration_time=new_expire_time,
# MyPy is unhappy with this; long-term solution is likely switch to
# new @frozen attrs API, with type annotations.
_expiration_time=new_expire_time, # type: ignore[call-arg]
)
def is_renew_secret(self, candidate_secret):

View File

@ -56,7 +56,7 @@ class HashedLeaseSerializer(object):
"""
Hash a lease secret for storage.
"""
return blake2b(secret, digest_size=32, encoder=RawEncoder())
return blake2b(secret, digest_size=32, encoder=RawEncoder)
@classmethod
def _hash_lease_info(cls, lease_info):

View File

@ -55,7 +55,9 @@ class StorageServer(service.MultiService):
"""
Implement the business logic for the storage server.
"""
name = 'storage'
# The type in Twisted for services is wrong in 22.10...
# https://github.com/twisted/twisted/issues/10135
name = 'storage' # type: ignore[assignment]
# only the tests change this to anything else
LeaseCheckerClass = LeaseCheckingCrawler

View File

@ -33,7 +33,7 @@ Ported to Python 3.
from __future__ import annotations
from six import ensure_text
from typing import Union, Callable, Any, Optional
from typing import Union, Callable, Any, Optional, cast
from os import urandom
import re
import time
@ -53,6 +53,7 @@ from twisted.python.failure import Failure
from twisted.web import http
from twisted.internet.task import LoopingCall
from twisted.internet import defer, reactor
from twisted.internet.interfaces import IReactorTime
from twisted.application import service
from twisted.plugin import (
getPlugins,
@ -70,6 +71,7 @@ from allmydata.interfaces import (
IServer,
IStorageServer,
IFoolscapStoragePlugin,
VersionMessage
)
from allmydata.grid_manager import (
create_grid_manager_verifier,
@ -77,6 +79,7 @@ from allmydata.grid_manager import (
from allmydata.crypto import (
ed25519,
)
from allmydata.util.tor_provider import _Provider as TorProvider
from allmydata.util import log, base32, connection_status
from allmydata.util.assertutil import precondition
from allmydata.util.observer import ObserverList
@ -87,7 +90,8 @@ from allmydata.util.deferredutil import async_to_deferred, race
from allmydata.storage.http_client import (
StorageClient, StorageClientImmutables, StorageClientGeneral,
ClientException as HTTPClientException, StorageClientMutables,
ReadVector, TestWriteVectors, WriteVector, TestVector, ClientException
ReadVector, TestWriteVectors, WriteVector, TestVector, ClientException,
StorageClientFactory
)
from .node import _Config
@ -202,8 +206,13 @@ class StorageFarmBroker(service.MultiService):
tub_maker,
node_config: _Config,
storage_client_config=None,
default_connection_handlers=None,
tor_provider: Optional[TorProvider]=None,
):
service.MultiService.__init__(self)
if default_connection_handlers is None:
default_connection_handlers = {"tcp": "tcp"}
assert permute_peers # False not implemented yet
self.permute_peers = permute_peers
self._tub_maker = tub_maker
@ -223,6 +232,8 @@ class StorageFarmBroker(service.MultiService):
self.introducer_client = None
self._threshold_listeners : list[tuple[float,defer.Deferred[Any]]]= [] # tuples of (threshold, Deferred)
self._connected_high_water_mark = 0
self._tor_provider = tor_provider
self._default_connection_handlers = default_connection_handlers
@log_call(action_type=u"storage-client:broker:set-static-servers")
def set_static_servers(self, servers):
@ -315,6 +326,8 @@ class StorageFarmBroker(service.MultiService):
server_id,
server["ann"],
grid_manager_verifier=gm_verifier,
default_connection_handlers=self._default_connection_handlers,
tor_provider=self._tor_provider
)
s.on_status_changed(lambda _: self._got_connection())
return s
@ -1049,7 +1062,7 @@ class HTTPNativeStorageServer(service.MultiService):
"connected".
"""
def __init__(self, server_id: bytes, announcement, reactor=reactor, grid_manager_verifier=None):
def __init__(self, server_id: bytes, announcement, default_connection_handlers: dict[str,str], reactor=reactor, grid_manager_verifier=None, tor_provider: Optional[TorProvider]=None):
service.MultiService.__init__(self)
assert isinstance(server_id, bytes)
self._server_id = server_id
@ -1057,6 +1070,10 @@ class HTTPNativeStorageServer(service.MultiService):
self._on_status_changed = ObserverList()
self._reactor = reactor
self._grid_manager_verifier = grid_manager_verifier
self._storage_client_factory = StorageClientFactory(
default_connection_handlers, tor_provider
)
furl = announcement["anonymous-storage-FURL"].encode("utf-8")
(
self._nickname,
@ -1074,7 +1091,7 @@ class HTTPNativeStorageServer(service.MultiService):
self._connection_status = connection_status.ConnectionStatus.unstarted()
self._version = None
self._last_connect_time = None
self._connecting_deferred = None
self._connecting_deferred : Optional[defer.Deferred[object]]= None
def get_permutation_seed(self):
return self._permutation_seed
@ -1236,21 +1253,24 @@ class HTTPNativeStorageServer(service.MultiService):
# version() calls before we are live talking to a server, it could only
# be one. See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3992
def request(reactor, nurl: DecodedURL):
@async_to_deferred
async def request(reactor, nurl: DecodedURL):
# Since we're just using this one off to check if the NURL
# works, no need for persistent pool or other fanciness.
pool = HTTPConnectionPool(reactor, persistent=False)
pool.retryAutomatically = False
return StorageClientGeneral(
StorageClient.from_nurl(nurl, reactor, pool)
).get_version()
storage_client = await self._storage_client_factory.create_storage_client(
nurl, reactor, pool
)
return await StorageClientGeneral(storage_client).get_version()
nurl = await _pick_a_http_server(reactor, self._nurls, request)
# If we've gotten this far, we've found a working NURL.
self._istorage_server = _HTTPStorageServer.from_http_client(
StorageClient.from_nurl(nurl, reactor)
storage_client = await self._storage_client_factory.create_storage_client(
nurl, cast(IReactorTime, reactor), None
)
self._istorage_server = _HTTPStorageServer.from_http_client(storage_client)
return self._istorage_server
try:
@ -1271,6 +1291,11 @@ class HTTPNativeStorageServer(service.MultiService):
if self._lc.running:
self._lc.stop()
self._failed_to_connect("shut down")
if self._istorage_server is not None:
client_shutting_down = self._istorage_server._http_client.shutdown()
result.addCallback(lambda _: client_shutting_down)
return result
@ -1484,7 +1509,7 @@ class _HTTPStorageServer(object):
"""
return _HTTPStorageServer(http_client=http_client)
def get_version(self):
def get_version(self) -> defer.Deferred[VersionMessage]:
return StorageClientGeneral(self._http_client).get_version()
@defer.inlineCallbacks

View File

@ -8,7 +8,7 @@ import json
import os
from functools import partial
from os.path import join
from typing import Awaitable, Callable, Optional, Sequence, TypeVar, Union
from typing import Callable, Optional, Sequence, TypeVar, Union, Coroutine, Any, Tuple, cast, Generator
from twisted.internet import defer
from twisted.trial import unittest
@ -60,7 +60,7 @@ def make_simple_peer(
server: MemoryWormholeServer,
helper: TestingHelper,
messages: Sequence[JSONable],
) -> Callable[[], Awaitable[IWormhole]]:
) -> Callable[[], Coroutine[defer.Deferred[IWormhole], Any, IWormhole]]:
"""
Make a wormhole peer that just sends the given messages.
@ -102,18 +102,24 @@ A = TypeVar("A")
B = TypeVar("B")
def concurrently(
client: Callable[[], Awaitable[A]],
server: Callable[[], Awaitable[B]],
) -> defer.Deferred[tuple[A, B]]:
client: Callable[[], Union[
Coroutine[defer.Deferred[A], Any, A],
Generator[defer.Deferred[A], Any, A],
]],
server: Callable[[], Union[
Coroutine[defer.Deferred[B], Any, B],
Generator[defer.Deferred[B], Any, B],
]],
) -> defer.Deferred[Tuple[A, B]]:
"""
Run two asynchronous functions concurrently and asynchronously return a
tuple of both their results.
"""
return defer.gatherResults([
result = defer.gatherResults([
defer.Deferred.fromCoroutine(client()),
defer.Deferred.fromCoroutine(server()),
])
]).addCallback(tuple) # type: ignore
return cast(defer.Deferred[Tuple[A, B]], result)
class Join(GridTestMixin, CLITestMixin, unittest.TestCase):

View File

@ -1,16 +1,8 @@
"""
Tests for ``allmydata.scripts.tahoe_run``.
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from __future__ import annotations
import re
from six.moves import (
@ -31,6 +23,12 @@ from twisted.python.filepath import (
from twisted.internet.testing import (
MemoryReactor,
)
from twisted.python.failure import (
Failure,
)
from twisted.internet.error import (
ConnectionDone,
)
from twisted.internet.test.modulehelpers import (
AlternateReactor,
)
@ -147,6 +145,91 @@ class DaemonizeTheRealServiceTests(SyncTestCase):
)
class DaemonizeStopTests(SyncTestCase):
"""
Tests relating to stopping the daemon
"""
def setUp(self):
self.nodedir = FilePath(self.mktemp())
self.nodedir.makedirs()
config = ""
self.nodedir.child("tahoe.cfg").setContent(config.encode("ascii"))
self.nodedir.child("tahoe-client.tac").touch()
# arrange to know when reactor.stop() is called
self.reactor = MemoryReactor()
self.stop_calls = []
def record_stop():
self.stop_calls.append(object())
self.reactor.stop = record_stop
super().setUp()
def _make_daemon(self, extra_argv: list[str]) -> DaemonizeTheRealService:
"""
Create the daemonization service.
:param extra_argv: Extra arguments to pass between ``run`` and the
node path.
"""
options = parse_options(["run"] + extra_argv + [self.nodedir.path])
options.stdout = StringIO()
options.stderr = StringIO()
options.stdin = StringIO()
run_options = options.subOptions
return DaemonizeTheRealService(
"client",
self.nodedir.path,
run_options,
)
def _run_daemon(self) -> None:
"""
Simulate starting up the reactor so the daemon plugin can do its
stuff.
"""
# We happen to know that the service uses reactor.callWhenRunning
# to schedule all its work (though I couldn't tell you *why*).
# Make sure those scheduled calls happen.
waiting = self.reactor.whenRunningHooks[:]
del self.reactor.whenRunningHooks[:]
for f, a, k in waiting:
f(*a, **k)
def _close_stdin(self) -> None:
"""
Simulate closing the daemon plugin's stdin.
"""
# there should be a single reader: our StandardIO process
# reader for stdin. Simulate it closing.
for r in self.reactor.getReaders():
r.connectionLost(Failure(ConnectionDone()))
def test_stop_on_stdin_close(self):
"""
We stop when stdin is closed.
"""
with AlternateReactor(self.reactor):
service = self._make_daemon([])
service.startService()
self._run_daemon()
self._close_stdin()
self.assertEqual(len(self.stop_calls), 1)
def test_allow_stdin_close(self):
"""
If --allow-stdin-close is specified then closing stdin doesn't
stop the process
"""
with AlternateReactor(self.reactor):
service = self._make_daemon(["--allow-stdin-close"])
service.startService()
self._run_daemon()
self._close_stdin()
self.assertEqual(self.stop_calls, [])
class RunTests(SyncTestCase):
"""
Tests for ``run``.

View File

@ -40,7 +40,7 @@ from itertools import count
from sys import stderr
from attrs import frozen, define, field, Factory
from twisted.internet.defer import Deferred, DeferredQueue, succeed, Awaitable
from twisted.internet.defer import Deferred, DeferredQueue, succeed
from wormhole._interfaces import IWormhole
from wormhole.wormhole import create
from zope.interface import implementer
@ -63,14 +63,15 @@ class MemoryWormholeServer(object):
specific application id and relay URL combination.
"""
_apps: dict[ApplicationKey, _WormholeApp] = field(default=Factory(dict))
_waiters: dict[ApplicationKey, Deferred] = field(default=Factory(dict))
_waiters: dict[ApplicationKey, Deferred[IWormhole]] = field(default=Factory(dict))
def create(
self,
appid: str,
relay_url: str,
reactor: Any,
versions: Any={},
# Unfortunately we need a mutable default to match the real API
versions: Any={}, # noqa: B006
delegate: Optional[Any]=None,
journal: Optional[Any]=None,
tor: Optional[Any]=None,
@ -129,7 +130,7 @@ class TestingHelper(object):
key = (relay_url, appid)
if key in self._server._waiters:
raise ValueError(f"There is already a waiter for {key}")
d = Deferred()
d : Deferred[IWormhole] = Deferred()
self._server._waiters[key] = d
wormhole = await d
return wormhole
@ -165,7 +166,7 @@ class _WormholeApp(object):
appid/relay_url scope.
"""
wormholes: dict[WormholeCode, IWormhole] = field(default=Factory(dict))
_waiting: dict[WormholeCode, List[Deferred]] = field(default=Factory(dict))
_waiting: dict[WormholeCode, List[Deferred[_MemoryWormhole]]] = field(default=Factory(dict))
_counter: Iterator[int] = field(default=Factory(count))
def allocate_code(self, wormhole: IWormhole, code: Optional[WormholeCode]) -> WormholeCode:
@ -191,13 +192,13 @@ class _WormholeApp(object):
return code
def wait_for_wormhole(self, code: WormholeCode) -> Awaitable[_MemoryWormhole]:
def wait_for_wormhole(self, code: WormholeCode) -> Deferred[_MemoryWormhole]:
"""
Return a ``Deferred`` which fires with the next wormhole to be associated
with the given code. This is used to let the first end of a wormhole
rendezvous with the second end.
"""
d = Deferred()
d : Deferred[_MemoryWormhole] = Deferred()
self._waiting.setdefault(code, []).append(d)
return d
@ -241,8 +242,8 @@ class _MemoryWormhole(object):
_view: _WormholeServerView
_code: Optional[WormholeCode] = None
_payload: DeferredQueue = field(default=Factory(DeferredQueue))
_waiting_for_code: list[Deferred] = field(default=Factory(list))
_payload: DeferredQueue[WormholeMessage] = field(default=Factory(DeferredQueue))
_waiting_for_code: list[Deferred[WormholeCode]] = field(default=Factory(list))
def allocate_code(self) -> None:
if self._code is not None:
@ -264,7 +265,7 @@ class _MemoryWormhole(object):
def when_code(self) -> Deferred[WormholeCode]:
if self._code is None:
d = Deferred()
d : Deferred[WormholeCode] = Deferred()
self._waiting_for_code.append(d)
return d
return succeed(self._code)

View File

@ -686,8 +686,8 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
def setUp(self):
self._http_client_pools = []
http_client.StorageClient.start_test_mode(self._got_new_http_connection_pool)
self.addCleanup(http_client.StorageClient.stop_test_mode)
http_client.StorageClientFactory.start_test_mode(self._got_new_http_connection_pool)
self.addCleanup(http_client.StorageClientFactory.stop_test_mode)
self.port_assigner = SameProcessStreamEndpointAssigner()
self.port_assigner.setUp()
self.addCleanup(self.port_assigner.tearDown)
@ -819,8 +819,8 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
helper_furl = f.read()
self.helper_furl = helper_furl
if self.numclients >= 4:
with open(os.path.join(basedirs[3], 'tahoe.cfg'), 'a+') as f:
if self.numclients >= 2:
with open(os.path.join(basedirs[1], 'tahoe.cfg'), 'a+') as f:
f.write(
"[client]\n"
"helper.furl = {}\n".format(helper_furl)
@ -836,9 +836,9 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
log.msg("CONNECTED")
# now find out where the web port was
self.webish_url = self.clients[0].getServiceNamed("webish").getURL()
if self.numclients >=4:
if self.numclients >=2:
# and the helper-using webport
self.helper_webish_url = self.clients[3].getServiceNamed("webish").getURL()
self.helper_webish_url = self.clients[1].getServiceNamed("webish").getURL()
def _generate_config(self, which, basedir, force_foolscap=False):
config = {}
@ -854,10 +854,10 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
("node", "tub.location"): allclients,
# client 0 runs a webserver and a helper
# client 3 runs a webserver but no helper
("node", "web.port"): {0, 3},
# client 1 runs a webserver but no helper
("node", "web.port"): {0, 1},
("node", "timeout.keepalive"): {0},
("node", "timeout.disconnect"): {3},
("node", "timeout.disconnect"): {1},
("helper", "enabled"): {0},
}

View File

@ -78,18 +78,21 @@ class Version(GridTestMixin, AsyncTestCase, testutil.ShouldFailMixin, \
fso.nodedirs = [os.path.dirname(abspath_expanduser_unicode(str(storedir)))
for (i,ss,storedir)
in self.iterate_servers()]
fso.stdout = StringIO()
fso.stderr = StringIO()
# This attribute isn't defined on FindSharesOptions but `find_shares()`
# definitely expects it...
fso.stdout = StringIO() # type: ignore[attr-defined]
debug.find_shares(fso)
sharefiles = fso.stdout.getvalue().splitlines()
sharefiles = fso.stdout.getvalue().splitlines() # type: ignore[attr-defined]
expected = self.nm.default_encoding_parameters["n"]
self.assertThat(sharefiles, HasLength(expected))
# This attribute isn't defined on DebugOptions but `dump_share()`
# definitely expects it...
do = debug.DumpOptions()
do["filename"] = sharefiles[0]
do.stdout = StringIO()
do.stdout = StringIO() # type: ignore[attr-defined]
debug.dump_share(do)
output = do.stdout.getvalue()
output = do.stdout.getvalue() # type: ignore[attr-defined]
lines = set(output.splitlines())
self.assertTrue("Mutable slot found:" in lines, output)
self.assertTrue(" share_type: MDMF" in lines, output)
@ -104,10 +107,12 @@ class Version(GridTestMixin, AsyncTestCase, testutil.ShouldFailMixin, \
self.assertTrue(" verify-cap: %s" % vcap in lines, output)
cso = debug.CatalogSharesOptions()
cso.nodedirs = fso.nodedirs
cso.stdout = StringIO()
cso.stderr = StringIO()
# Definitely not options on CatalogSharesOptions, but the code does use
# stdout and stderr...
cso.stdout = StringIO() # type: ignore[attr-defined]
cso.stderr = StringIO() # type: ignore[attr-defined]
debug.catalog_shares(cso)
shares = cso.stdout.getvalue().splitlines()
shares = cso.stdout.getvalue().splitlines() # type: ignore[attr-defined]
oneshare = shares[0] # all shares should be MDMF
self.failIf(oneshare.startswith("UNKNOWN"), oneshare)
self.assertTrue(oneshare.startswith("MDMF"), oneshare)

View File

@ -476,7 +476,7 @@ class GridTestMixin(object):
])
def set_up_grid(self, num_clients=1, num_servers=10,
client_config_hooks={}, oneshare=False):
client_config_hooks=None, oneshare=False):
"""
Create a Tahoe-LAFS storage grid.
@ -489,6 +489,8 @@ class GridTestMixin(object):
:return: ``None``
"""
if client_config_hooks is None:
client_config_hooks = {}
# self.basedir must be set
port_assigner = SameProcessStreamEndpointAssigner()
port_assigner.setUp()

View File

@ -39,6 +39,12 @@ class Producer(object):
self.consumer = consumer
self.done = False
def stopProducing(self):
pass
def pauseProducing(self):
pass
def resumeProducing(self):
"""Kick off streaming."""
self.iterate()

View File

@ -43,7 +43,11 @@ from testtools.matchers import Equals
from zope.interface import implementer
from .common import SyncTestCase
from ..storage.http_common import get_content_type, CBOR_MIME_TYPE
from ..storage.http_common import (
get_content_type,
CBOR_MIME_TYPE,
response_is_not_html,
)
from ..storage.common import si_b2a
from ..storage.lease import LeaseInfo
from ..storage.server import StorageServer
@ -58,6 +62,7 @@ from ..storage.http_server import (
)
from ..storage.http_client import (
StorageClient,
StorageClientFactory,
ClientException,
StorageClientImmutables,
ImmutableCreateResult,
@ -257,6 +262,10 @@ class TestApp(object):
_add_error_handling(_app)
_swissnum = SWISSNUM_FOR_TEST # Match what the test client is using
@_authorized_route(_app, {}, "/noop", methods=["GET"])
def noop(self, request, authorization):
return "noop"
@_authorized_route(_app, {Secrets.UPLOAD}, "/upload_secret", methods=["GET"])
def validate_upload_secret(self, request, authorization):
if authorization == {Secrets.UPLOAD: b"MAGIC"}:
@ -311,7 +320,6 @@ def result_of(d):
+ "This is probably a test design issue."
)
class CustomHTTPServerTests(SyncTestCase):
"""
Tests that use a custom HTTP server.
@ -319,10 +327,10 @@ class CustomHTTPServerTests(SyncTestCase):
def setUp(self):
super(CustomHTTPServerTests, self).setUp()
StorageClient.start_test_mode(
StorageClientFactory.start_test_mode(
lambda pool: self.addCleanup(pool.closeCachedConnections)
)
self.addCleanup(StorageClient.stop_test_mode)
self.addCleanup(StorageClientFactory.stop_test_mode)
# Could be a fixture, but will only be used in this test class so not
# going to bother:
self._http_server = TestApp()
@ -331,18 +339,59 @@ class CustomHTTPServerTests(SyncTestCase):
DecodedURL.from_text("http://127.0.0.1"),
SWISSNUM_FOR_TEST,
treq=treq,
pool=None,
# We're using a Treq private API to get the reactor, alas, but only
# in a test, so not going to worry about it too much. This would be
# fixed if https://github.com/twisted/treq/issues/226 were ever
# fixed.
clock=treq._agent._memoryReactor,
analyze_response=response_is_not_html,
)
self._http_server.clock = self.client._clock
def test_bad_swissnum_from_client(self) -> None:
"""
If the swissnum is invalid, a BAD REQUEST response code is returned.
"""
headers = Headers()
# The value is not UTF-8.
headers.addRawHeader("Authorization", b"\x00\xFF\x00\xFF")
response = result_of(
self.client._treq.request(
"GET",
DecodedURL.from_text("http://127.0.0.1/noop"),
headers=headers,
)
)
self.assertEqual(response.code, 400)
def test_bad_secret(self) -> None:
"""
If the secret is invalid (not base64), a BAD REQUEST
response code is returned.
"""
bad_secret = b"upload-secret []<>"
headers = Headers()
headers.addRawHeader(
"X-Tahoe-Authorization",
bad_secret,
)
response = result_of(
self.client.request(
"GET",
DecodedURL.from_text("http://127.0.0.1/upload_secret"),
headers=headers,
)
)
self.assertEqual(response.code, 400)
def test_authorization_enforcement(self):
"""
The requirement for secrets is enforced by the ``_authorized_route``
decorator; if they are not given, a 400 response code is returned.
Note that this refers to ``X-Tahoe-Authorization``, not the
``Authorization`` header used for the swissnum.
"""
# Without secret, get a 400 error.
response = result_of(
@ -468,8 +517,8 @@ class Reactor(Clock):
Clock.__init__(self)
self._queue = Queue()
def callFromThread(self, f, *args, **kwargs):
self._queue.put((f, args, kwargs))
def callFromThread(self, callable, *args, **kwargs):
self._queue.put((callable, args, kwargs))
def advance(self, *args, **kwargs):
Clock.advance(self, *args, **kwargs)
@ -485,10 +534,10 @@ class HttpTestFixture(Fixture):
"""
def _setUp(self):
StorageClient.start_test_mode(
StorageClientFactory.start_test_mode(
lambda pool: self.addCleanup(pool.closeCachedConnections)
)
self.addCleanup(StorageClient.stop_test_mode)
self.addCleanup(StorageClientFactory.stop_test_mode)
self.clock = Reactor()
self.tempdir = self.useFixture(TempDir())
# The global Cooperator used by Twisted (a) used by pull producers in
@ -512,7 +561,9 @@ class HttpTestFixture(Fixture):
DecodedURL.from_text("http://127.0.0.1"),
SWISSNUM_FOR_TEST,
treq=self.treq,
pool=None,
clock=self.clock,
analyze_response=response_is_not_html,
)
def result_of_with_flush(self, d):
@ -624,7 +675,9 @@ class GenericHTTPAPITests(SyncTestCase):
DecodedURL.from_text("http://127.0.0.1"),
b"something wrong",
treq=StubTreq(self.http.http_server.get_resource()),
pool=None,
clock=self.http.clock,
analyze_response=response_is_not_html,
)
)
with assert_fails_with_http_code(self, http.UNAUTHORIZED):
@ -1455,7 +1508,7 @@ class SharedImmutableMutableTestsMixin:
self.client.advise_corrupt_share(storage_index, 13, reason)
)
for (si, share_number) in [(storage_index, 11), (urandom(16), 13)]:
for si, share_number in [(storage_index, 11), (urandom(16), 13)]:
with assert_fails_with_http_code(self, http.NOT_FOUND):
self.http.result_of_with_flush(
self.client.advise_corrupt_share(si, share_number, reason)

View File

@ -1,20 +1,13 @@
"""
Ported to Python 3.
"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
# Don't import bytes since it causes issues on (so far unported) modules on Python 2.
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, dict, list, object, range, max, min, str # noqa: F401
from __future__ import annotations
from past.builtins import chr as byteschr, long
from six import ensure_text
import os, re, sys, time, json
from typing import Optional
from bs4 import BeautifulSoup
@ -56,10 +49,12 @@ from .common_util import run_cli_unicode
class RunBinTahoeMixin(object):
def run_bintahoe(self, args, stdin=None, python_options=[], env=None):
def run_bintahoe(self, args, stdin=None, python_options:Optional[list[str]]=None, env=None):
# test_runner.run_bintahoe has better unicode support but doesn't
# support env yet and is also synchronous. If we could get rid of
# this in favor of that, though, it would probably be an improvement.
if python_options is None:
python_options = []
command = sys.executable
argv = python_options + ["-b", "-m", "allmydata.scripts.runner"] + args
@ -787,7 +782,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
def test_filesystem(self):
self.data = LARGE_DATA
d = self.set_up_nodes()
d = self.set_up_nodes(2)
def _new_happy_semantics(ign):
for c in self.clients:
c.encoding_params['happy'] = 1
@ -1088,7 +1083,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
headers["content-type"] = "multipart/form-data; boundary=%s" % str(sepbase, "ascii")
return self.POST2(urlpath, body, headers, use_helper)
def POST2(self, urlpath, body=b"", headers={}, use_helper=False):
def POST2(self, urlpath, body=b"", headers=None, use_helper=False):
if headers is None:
headers = {}
if use_helper:
url = self.helper_webish_url + urlpath
else:
@ -1409,7 +1406,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
rc,out,err = yield run_cli(verb, *args, nodeargs=nodeargs, **kwargs)
defer.returnValue((out,err))
def _check_ls(out_and_err, expected_children, unexpected_children=[]):
def _check_ls(out_and_err, expected_children, unexpected_children=()):
(out, err) = out_and_err
self.failUnlessEqual(err, "")
for s in expected_children:
@ -1749,6 +1746,10 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
return d
# In CI this test can be very slow, so give it a longer timeout:
test_filesystem.timeout = 360 # type: ignore[attr-defined]
def test_filesystem_with_cli_in_subprocess(self):
# We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe.

View File

@ -1,14 +1,6 @@
"""
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import os
from twisted.trial import unittest
@ -94,16 +86,15 @@ class LaunchTor(unittest.TestCase):
reactor = object()
private_dir = "private"
txtorcon = mock.Mock()
tpp = mock.Mock
tpp.tor_protocol = mock.Mock()
txtorcon.launch_tor = mock.Mock(return_value=tpp)
tor = mock.Mock
txtorcon.launch = mock.Mock(return_value=tor)
with mock.patch("allmydata.util.tor_provider.allocate_tcp_port",
return_value=999999):
d = tor_provider._launch_tor(reactor, tor_executable, private_dir,
txtorcon)
tor_control_endpoint, tor_control_proto = self.successResultOf(d)
self.assertIs(tor_control_proto, tpp.tor_protocol)
tor_control_endpoint, tor_result = self.successResultOf(d)
self.assertIs(tor_result, tor)
def test_launch(self):
return self._do_test_launch(None)
@ -161,6 +152,12 @@ class ConnectToTor(unittest.TestCase):
return self._do_test_connect(None, False)
class FakeTor:
"""Pretends to be a ``txtorcon.Tor`` instance."""
def __init__(self):
self.protocol = object()
class CreateOnion(unittest.TestCase):
def test_no_txtorcon(self):
with mock.patch("allmydata.util.tor_provider._import_txtorcon",
@ -171,6 +168,7 @@ class CreateOnion(unittest.TestCase):
self.assertEqual(str(f.value),
"Cannot create onion without txtorcon. "
"Please 'pip install tahoe-lafs[tor]' to fix this.")
def _do_test_launch(self, executable):
basedir = self.mktemp()
os.mkdir(basedir)
@ -181,9 +179,9 @@ class CreateOnion(unittest.TestCase):
if executable:
args.append("--tor-executable=%s" % executable)
cli_config = make_cli_config(basedir, *args)
protocol = object()
tor_instance = FakeTor()
launch_tor = mock.Mock(return_value=defer.succeed(("control_endpoint",
protocol)))
tor_instance)))
txtorcon = mock.Mock()
ehs = mock.Mock()
# This appears to be a native string in the real txtorcon object...
@ -204,8 +202,8 @@ class CreateOnion(unittest.TestCase):
launch_tor.assert_called_with(reactor, executable,
os.path.abspath(private_dir), txtorcon)
txtorcon.EphemeralHiddenService.assert_called_with("3457 127.0.0.1:999999")
ehs.add_to_tor.assert_called_with(protocol)
ehs.remove_from_tor.assert_called_with(protocol)
ehs.add_to_tor.assert_called_with(tor_instance.protocol)
ehs.remove_from_tor.assert_called_with(tor_instance.protocol)
expected = {"launch": "true",
"onion": "true",
@ -587,13 +585,14 @@ class Provider_Service(unittest.TestCase):
txtorcon = mock.Mock()
with mock_txtorcon(txtorcon):
p = tor_provider.create(reactor, cfg)
tor_instance = FakeTor()
tor_state = mock.Mock()
tor_state.protocol = object()
tor_state.protocol = tor_instance.protocol
ehs = mock.Mock()
ehs.add_to_tor = mock.Mock(return_value=defer.succeed(None))
ehs.remove_from_tor = mock.Mock(return_value=defer.succeed(None))
txtorcon.EphemeralHiddenService = mock.Mock(return_value=ehs)
launch_tor = mock.Mock(return_value=defer.succeed((None,tor_state.protocol)))
launch_tor = mock.Mock(return_value=defer.succeed((None,tor_instance)))
with mock.patch("allmydata.util.tor_provider._launch_tor",
launch_tor):
d = p.startService()
@ -628,9 +627,8 @@ class Provider_Service(unittest.TestCase):
txtorcon = mock.Mock()
with mock_txtorcon(txtorcon):
p = tor_provider.create(reactor, cfg)
tor_state = mock.Mock()
tor_state.protocol = object()
txtorcon.build_tor_connection = mock.Mock(return_value=tor_state)
tor_instance = FakeTor()
txtorcon.connect = mock.Mock(return_value=tor_instance)
ehs = mock.Mock()
ehs.add_to_tor = mock.Mock(return_value=defer.succeed(None))
ehs.remove_from_tor = mock.Mock(return_value=defer.succeed(None))
@ -642,12 +640,12 @@ class Provider_Service(unittest.TestCase):
yield flushEventualQueue()
self.successResultOf(d)
self.assertIs(p._onion_ehs, ehs)
self.assertIs(p._onion_tor_control_proto, tor_state.protocol)
self.assertIs(p._onion_tor_control_proto, tor_instance.protocol)
cfs.assert_called_with(reactor, "ep_desc")
txtorcon.build_tor_connection.assert_called_with(tcep)
txtorcon.connect.assert_called_with(reactor, tcep)
txtorcon.EphemeralHiddenService.assert_called_with("456 127.0.0.1:123",
b"private key")
ehs.add_to_tor.assert_called_with(tor_state.protocol)
ehs.add_to_tor.assert_called_with(tor_instance.protocol)
yield p.stopService()
ehs.remove_from_tor.assert_called_with(tor_state.protocol)
ehs.remove_from_tor.assert_called_with(tor_instance.protocol)

View File

@ -565,7 +565,9 @@ class WebMixin(TimezoneMixin):
returnValue(data)
@inlineCallbacks
def HEAD(self, urlpath, return_response=False, headers={}):
def HEAD(self, urlpath, return_response=False, headers=None):
if headers is None:
headers = {}
url = self.webish_url + urlpath
response = yield treq.request("head", url, persistent=False,
headers=headers)
@ -573,7 +575,9 @@ class WebMixin(TimezoneMixin):
raise Error(response.code, response="")
returnValue( ("", response.code, response.headers) )
def PUT(self, urlpath, data, headers={}):
def PUT(self, urlpath, data, headers=None):
if headers is None:
headers = {}
url = self.webish_url + urlpath
return do_http("put", url, data=data, headers=headers)
@ -618,7 +622,9 @@ class WebMixin(TimezoneMixin):
body, headers = self.build_form(**fields)
return self.POST2(urlpath, body, headers)
def POST2(self, urlpath, body="", headers={}, followRedirect=False):
def POST2(self, urlpath, body="", headers=None, followRedirect=False):
if headers is None:
headers = {}
url = self.webish_url + urlpath
if isinstance(body, str):
body = body.encode("utf-8")

View File

@ -276,6 +276,15 @@ class _SynchronousProducer(object):
consumer.write(self.body)
return succeed(None)
def stopProducing(self):
pass
def pauseProducing(self):
pass
def resumeProducing(self):
pass
def create_tahoe_treq_client(root=None):
"""

View File

@ -25,7 +25,7 @@ class DBError(Exception):
def get_db(dbfile, stderr=sys.stderr,
create_version=(None, None), updaters={}, just_create=False, dbname="db",
create_version=(None, None), updaters=None, just_create=False, dbname="db",
):
"""Open or create the given db file. The parent directory must exist.
create_version=(SCHEMA, VERNUM), and SCHEMA must have a 'version' table.
@ -33,6 +33,8 @@ def get_db(dbfile, stderr=sys.stderr,
to get from ver=1 to ver=2. Returns a (sqlite3,db) tuple, or raises
DBError.
"""
if updaters is None:
updaters = {}
must_create = not os.path.exists(dbfile)
try:
db = sqlite3.connect(dbfile)

View File

@ -14,6 +14,7 @@ from typing import (
TypeVar,
Optional,
Coroutine,
Generator
)
from typing_extensions import ParamSpec
@ -212,7 +213,7 @@ class WaitForDelayedCallsMixin(PollMixin):
def until(
action: Callable[[], defer.Deferred[Any]],
condition: Callable[[], bool],
) -> defer.Deferred[None]:
) -> Generator[Any, None, None]:
"""
Run a Deferred-returning function until a condition is true.

View File

@ -1,10 +1,8 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Any
from typing import Any, Optional
from typing_extensions import Literal
import os
from zope.interface import (
@ -23,6 +21,7 @@ from ..interfaces import (
)
from ..listeners import ListenerConfig
def _import_tor():
try:
from foolscap.connections import tor
@ -43,7 +42,7 @@ def can_hide_ip() -> Literal[True]:
def is_available() -> bool:
return not (_import_tor() is None or _import_txtorcon() is None)
def create(reactor: Any, config: Any, import_tor=None, import_txtorcon=None) -> IAddressFamily:
def create(reactor, config, import_tor=None, import_txtorcon=None) -> Optional[_Provider]:
"""
Create a new _Provider service (this is an IService so must be
hooked up to a parent or otherwise started).
@ -100,33 +99,31 @@ def _try_to_connect(reactor, endpoint_desc, stdout, txtorcon):
@inlineCallbacks
def _launch_tor(reactor, tor_executable, private_dir, txtorcon):
"""
Launches Tor, returns a corresponding ``(control endpoint string,
txtorcon.Tor instance)`` tuple.
"""
# TODO: handle default tor-executable
# TODO: it might be a good idea to find exactly which Tor we used,
# and record it's absolute path into tahoe.cfg . This would protect
# us against one Tor being on $PATH at create-node time, but then a
# different Tor being present at node startup. OTOH, maybe we don't
# need to worry about it.
tor_config = txtorcon.TorConfig()
tor_config.DataDirectory = data_directory(private_dir)
# unix-domain control socket
tor_config.ControlPort = "unix:" + os.path.join(private_dir, "tor.control")
tor_control_endpoint_desc = tor_config.ControlPort
tor_control_endpoint_desc = "unix:" + os.path.join(private_dir, "tor.control")
tor_config.SOCKSPort = allocate_tcp_port()
tpp = yield txtorcon.launch_tor(
tor_config, reactor,
tor = yield txtorcon.launch(
reactor,
control_port=tor_control_endpoint_desc,
data_directory=data_directory(private_dir),
tor_binary=tor_executable,
socks_port=allocate_tcp_port(),
# can be useful when debugging; mirror Tor's output to ours
# stdout=sys.stdout,
# stderr=sys.stderr,
)
# now tor is launched and ready to be spoken to
# as a side effect, we've got an ITorControlProtocol ready to go
tor_control_proto = tpp.tor_protocol
# How/when to shut down the new process? for normal usage, the child
# tor will exit when it notices its parent (us) quit. Unit tests will
# mock out txtorcon.launch_tor(), so there will never be a real Tor
@ -136,7 +133,8 @@ def _launch_tor(reactor, tor_executable, private_dir, txtorcon):
# (because it's a TorProcessProtocol) which returns a Deferred
# that fires when Tor has actually exited.
returnValue((tor_control_endpoint_desc, tor_control_proto))
returnValue((tor_control_endpoint_desc, tor))
@inlineCallbacks
def _connect_to_tor(reactor, cli_config, txtorcon):
@ -170,8 +168,9 @@ async def create_config(reactor: Any, cli_config: Any) -> ListenerConfig:
if tor_executable:
tahoe_config_tor.append(("tor.executable", tor_executable))
print("launching Tor (to allocate .onion address)..", file=stdout)
(_, tor_control_proto) = await _launch_tor(
(_, tor) = await _launch_tor(
reactor, tor_executable, private_dir, txtorcon)
tor_control_proto = tor.protocol
print("Tor launched", file=stdout)
else:
print("connecting to Tor (to allocate .onion address)..", file=stdout)
@ -299,7 +298,7 @@ class _Provider(service.MultiService):
returnValue(tor_control_endpoint)
def _get_launched_tor(self, reactor):
# this fires with a tuple of (control_endpoint, tor_protocol)
# this fires with a tuple of (control_endpoint, txtorcon.Tor instance)
if not self._tor_launched:
self._tor_launched = OneShotObserverList()
private_dir = self._config.get_config_path("private")
@ -330,17 +329,20 @@ class _Provider(service.MultiService):
require("external_port")
require("private_key_file")
@inlineCallbacks
def _start_onion(self, reactor):
def get_tor_instance(self, reactor: object):
"""Return a ``Deferred`` that fires with a ``txtorcon.Tor`` instance."""
# launch tor, if necessary
if self._get_tor_config("launch", False, boolean=True):
(_, tor_control_proto) = yield self._get_launched_tor(reactor)
return self._get_launched_tor(reactor).addCallback(lambda t: t[1])
else:
controlport = self._get_tor_config("control.port", None)
tcep = clientFromString(reactor, controlport)
tor_state = yield self._txtorcon.build_tor_connection(tcep)
tor_control_proto = tor_state.protocol
return self._txtorcon.connect(reactor, tcep)
@inlineCallbacks
def _start_onion(self, reactor):
tor_instance = yield self.get_tor_instance(reactor)
tor_control_proto = tor_instance.protocol
local_port = int(self._get_tor_config("onion.local_port"))
external_port = int(self._get_tor_config("onion.external_port"))

View File

@ -87,6 +87,7 @@ from allmydata.util.encodingutil import (
from allmydata.util import abbreviate
from allmydata.crypto.rsa import PrivateKey, PublicKey, create_signing_keypair_from_string
class WebError(Exception):
def __init__(self, text, code=http.BAD_REQUEST):
self.text = text
@ -723,16 +724,21 @@ def get_arg(req: IRequest, argname: str | bytes, default: Optional[T] = None, *,
:return: Either bytes or tuple of bytes.
"""
# Need to import here to prevent circular import:
from ..webish import TahoeLAFSRequest
if isinstance(argname, str):
argname_bytes = argname.encode("utf-8")
else:
argname_bytes = argname
results = []
if argname_bytes in req.args:
results : list[bytes] = []
if req.args is not None and argname_bytes in req.args:
results.extend(req.args[argname_bytes])
argname_unicode = str(argname_bytes, "utf-8")
if req.fields and argname_unicode in req.fields:
if isinstance(req, TahoeLAFSRequest) and req.fields and argname_unicode in req.fields:
# In all but one or two unit tests, the request will be a
# TahoeLAFSRequest.
value = req.fields[argname_unicode].value
if isinstance(value, str):
value = value.encode("utf-8")

View File

@ -43,8 +43,9 @@ DAY = 24*HOUR
class OphandleTable(resource.Resource, service.Service):
"""Renders /operations/%d."""
name = "operations"
# The type in Twisted for services is wrong in 22.10...
# https://github.com/twisted/twisted/issues/10135
name = "operations" # type: ignore[assignment]
UNCOLLECTED_HANDLE_LIFETIME = 4*DAY
COLLECTED_HANDLE_LIFETIME = 1*DAY

View File

@ -242,7 +242,9 @@ class TahoeLAFSSite(Site, object):
class WebishServer(service.MultiService):
name = "webish"
# The type in Twisted for services is wrong in 22.10...
# https://github.com/twisted/twisted/issues/10135
name = "webish" # type: ignore[assignment]
def __init__(self, client, webport, tempdir, nodeurl_path=None, staticdir=None,
clock=None, now_fn=time.time):

14
tox.ini
View File

@ -121,20 +121,18 @@ commands =
[testenv:typechecks]
basepython = python3
skip_install = True
deps =
mypy
mypy-zope
mypy==1.3.0
# When 0.9.2 comes out it will work with 1.3, it's just unreleased at the moment...
git+https://github.com/shoobx/mypy-zope@f276030
types-mock
types-six
types-PyYAML
types-pkg_resources
types-pyOpenSSL
git+https://github.com/warner/foolscap
# Twisted 21.2.0 introduces some type hints which we are not yet
# compatible with.
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3622
twisted<21.2.0
foolscap
# Upgrade when new releases come out:
Twisted==22.10.0
commands = mypy src