mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-23 10:40:23 +00:00
Merge pull request #1303 from tahoe-lafs/4029-http-storage-client-respects-request-to-use-tor
Http storage client respects request to use tor Fixes ticket:4029
This commit is contained in:
commit
839140c6ab
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@ -166,7 +166,7 @@ jobs:
|
||||
matrix:
|
||||
include:
|
||||
- os: macos-12
|
||||
python-version: "3.9"
|
||||
python-version: "3.11"
|
||||
force-foolscap: false
|
||||
- os: windows-latest
|
||||
python-version: "3.11"
|
||||
|
@ -279,7 +279,7 @@ def introducer_furl(introducer, temp_dir):
|
||||
return furl
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
@pytest.fixture
|
||||
@log_call(
|
||||
action_type=u"integration:tor:introducer",
|
||||
include_args=["temp_dir", "flog_gatherer"],
|
||||
@ -342,7 +342,7 @@ def tor_introducer(reactor, temp_dir, flog_gatherer, request):
|
||||
return transport
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
@pytest.fixture
|
||||
def tor_introducer_furl(tor_introducer, temp_dir):
|
||||
furl_fname = join(temp_dir, 'introducer_tor', 'private', 'introducer.furl')
|
||||
while not exists(furl_fname):
|
||||
|
@ -19,6 +19,7 @@ from allmydata.test.common import (
|
||||
write_introducer,
|
||||
)
|
||||
from allmydata.client import read_config
|
||||
from allmydata.util.deferredutil import async_to_deferred
|
||||
|
||||
# see "conftest.py" for the fixtures (e.g. "tor_network")
|
||||
|
||||
@ -31,13 +32,26 @@ if sys.platform.startswith('win'):
|
||||
|
||||
@pytest_twisted.inlineCallbacks
|
||||
def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl):
|
||||
carol = yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl)
|
||||
dave = yield _create_anonymous_node(reactor, 'dave', 8009, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl)
|
||||
"""
|
||||
Two nodes and an introducer all configured to use Tahoe.
|
||||
|
||||
The two nodes can talk to the introducer and each other: we upload to one
|
||||
node, read from the other.
|
||||
"""
|
||||
carol = yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2)
|
||||
dave = yield _create_anonymous_node(reactor, 'dave', 8009, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2)
|
||||
yield util.await_client_ready(carol, minimum_number_of_servers=2, timeout=600)
|
||||
yield util.await_client_ready(dave, minimum_number_of_servers=2, timeout=600)
|
||||
yield upload_to_one_download_from_the_other(reactor, temp_dir, carol, dave)
|
||||
|
||||
|
||||
@async_to_deferred
|
||||
async def upload_to_one_download_from_the_other(reactor, temp_dir, upload_to: util.TahoeProcess, download_from: util.TahoeProcess):
|
||||
"""
|
||||
Ensure both nodes are connected to "a grid" by uploading something via one
|
||||
node, and retrieve it using the other.
|
||||
"""
|
||||
|
||||
# ensure both nodes are connected to "a grid" by uploading
|
||||
# something via carol, and retrieve it using dave.
|
||||
gold_path = join(temp_dir, "gold")
|
||||
with open(gold_path, "w") as f:
|
||||
f.write(
|
||||
@ -54,12 +68,12 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne
|
||||
sys.executable,
|
||||
(
|
||||
sys.executable, '-b', '-m', 'allmydata.scripts.runner',
|
||||
'-d', join(temp_dir, 'carol'),
|
||||
'-d', upload_to.node_dir,
|
||||
'put', gold_path,
|
||||
),
|
||||
env=environ,
|
||||
)
|
||||
yield proto.done
|
||||
await proto.done
|
||||
cap = proto.output.getvalue().strip().split()[-1]
|
||||
print("capability: {}".format(cap))
|
||||
|
||||
@ -69,19 +83,18 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne
|
||||
sys.executable,
|
||||
(
|
||||
sys.executable, '-b', '-m', 'allmydata.scripts.runner',
|
||||
'-d', join(temp_dir, 'dave'),
|
||||
'-d', download_from.node_dir,
|
||||
'get', cap,
|
||||
),
|
||||
env=environ,
|
||||
)
|
||||
yield proto.done
|
||||
|
||||
dave_got = proto.output.getvalue().strip()
|
||||
assert dave_got == open(gold_path, 'rb').read().strip()
|
||||
await proto.done
|
||||
download_got = proto.output.getvalue().strip()
|
||||
assert download_got == open(gold_path, 'rb').read().strip()
|
||||
|
||||
|
||||
@pytest_twisted.inlineCallbacks
|
||||
def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl):
|
||||
def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl, shares_total: int) -> util.TahoeProcess:
|
||||
node_dir = FilePath(temp_dir).child(name)
|
||||
web_port = "tcp:{}:interface=localhost".format(control_port + 2000)
|
||||
|
||||
@ -103,7 +116,7 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_
|
||||
'--listen', 'tor',
|
||||
'--shares-needed', '1',
|
||||
'--shares-happy', '1',
|
||||
'--shares-total', '2',
|
||||
'--shares-total', str(shares_total),
|
||||
node_dir.path,
|
||||
),
|
||||
env=environ,
|
||||
@ -113,9 +126,9 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_
|
||||
|
||||
# Which services should this client connect to?
|
||||
write_introducer(node_dir, "default", introducer_furl)
|
||||
util.basic_node_configuration(request, flog_gatherer, node_dir.path)
|
||||
|
||||
config = read_config(node_dir.path, "tub.port")
|
||||
config.set_config("node", "log_gatherer.furl", flog_gatherer)
|
||||
config.set_config("tor", "onion", "true")
|
||||
config.set_config("tor", "onion.external_port", "3457")
|
||||
config.set_config("tor", "control.port", f"tcp:port={control_port}:host=127.0.0.1")
|
||||
@ -125,3 +138,26 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_
|
||||
result = yield util._run_node(reactor, node_dir.path, request, None)
|
||||
print("okay, launched")
|
||||
return result
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith('darwin'), reason='This test has issues on macOS')
|
||||
@pytest_twisted.inlineCallbacks
|
||||
def test_anonymous_client(reactor, request, temp_dir, flog_gatherer, tor_network, introducer_furl):
|
||||
"""
|
||||
A normal node (normie) and a normal introducer are configured, and one node
|
||||
(anonymoose) which is configured to be anonymous by talking via Tor.
|
||||
|
||||
Anonymoose should be able to communicate with normie.
|
||||
|
||||
TODO how to ensure that anonymoose is actually using Tor?
|
||||
"""
|
||||
normie = yield util._create_node(
|
||||
reactor, request, temp_dir, introducer_furl, flog_gatherer, "normie",
|
||||
web_port="tcp:9989:interface=localhost",
|
||||
storage=True, needed=1, happy=1, total=1,
|
||||
)
|
||||
yield util.await_client_ready(normie)
|
||||
|
||||
anonymoose = yield _create_anonymous_node(reactor, 'anonymoose', 8008, request, temp_dir, flog_gatherer, tor_network, introducer_furl, 1)
|
||||
yield util.await_client_ready(anonymoose, minimum_number_of_servers=1, timeout=600)
|
||||
|
||||
yield upload_to_one_download_from_the_other(reactor, temp_dir, normie, anonymoose)
|
||||
|
@ -140,7 +140,8 @@ class _MagicTextProtocol(ProcessProtocol):
|
||||
|
||||
def outReceived(self, data):
|
||||
data = str(data, sys.stdout.encoding)
|
||||
sys.stdout.write(self.name + data)
|
||||
for line in data.splitlines():
|
||||
sys.stdout.write(self.name + line + "\n")
|
||||
self._output.write(data)
|
||||
if not self.magic_seen.called and self._magic_text in self._output.getvalue():
|
||||
print("Saw '{}' in the logs".format(self._magic_text))
|
||||
@ -148,7 +149,8 @@ class _MagicTextProtocol(ProcessProtocol):
|
||||
|
||||
def errReceived(self, data):
|
||||
data = str(data, sys.stderr.encoding)
|
||||
sys.stdout.write(self.name + data)
|
||||
for line in data.splitlines():
|
||||
sys.stdout.write(self.name + line + "\n")
|
||||
|
||||
|
||||
def _cleanup_process_async(transport: IProcessTransport, allow_missing: bool) -> None:
|
||||
@ -311,6 +313,36 @@ def _run_node(reactor, node_dir, request, magic_text, finalize=True):
|
||||
return d
|
||||
|
||||
|
||||
def basic_node_configuration(request, flog_gatherer, node_dir: str):
|
||||
"""
|
||||
Setup common configuration options for a node, given a ``pytest`` request
|
||||
fixture.
|
||||
"""
|
||||
config_path = join(node_dir, 'tahoe.cfg')
|
||||
config = get_config(config_path)
|
||||
set_config(
|
||||
config,
|
||||
u'node',
|
||||
u'log_gatherer.furl',
|
||||
flog_gatherer,
|
||||
)
|
||||
force_foolscap = request.config.getoption("force_foolscap")
|
||||
assert force_foolscap in (True, False)
|
||||
set_config(
|
||||
config,
|
||||
'storage',
|
||||
'force_foolscap',
|
||||
str(force_foolscap),
|
||||
)
|
||||
set_config(
|
||||
config,
|
||||
'client',
|
||||
'force_foolscap',
|
||||
str(force_foolscap),
|
||||
)
|
||||
write_config(FilePath(config_path), config)
|
||||
|
||||
|
||||
def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, name, web_port,
|
||||
storage=True,
|
||||
magic_text=None,
|
||||
@ -351,29 +383,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
|
||||
created_d = done_proto.done
|
||||
|
||||
def created(_):
|
||||
config_path = join(node_dir, 'tahoe.cfg')
|
||||
config = get_config(config_path)
|
||||
set_config(
|
||||
config,
|
||||
u'node',
|
||||
u'log_gatherer.furl',
|
||||
flog_gatherer,
|
||||
)
|
||||
force_foolscap = request.config.getoption("force_foolscap")
|
||||
assert force_foolscap in (True, False)
|
||||
set_config(
|
||||
config,
|
||||
'storage',
|
||||
'force_foolscap',
|
||||
str(force_foolscap),
|
||||
)
|
||||
set_config(
|
||||
config,
|
||||
'client',
|
||||
'force_foolscap',
|
||||
str(force_foolscap),
|
||||
)
|
||||
write_config(FilePath(config_path), config)
|
||||
basic_node_configuration(request, flog_gatherer, node_dir)
|
||||
created_d.addCallback(created)
|
||||
|
||||
d = Deferred()
|
||||
@ -621,16 +631,9 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2, minimum_number_of_serve
|
||||
server['last_received_data']
|
||||
for server in servers
|
||||
]
|
||||
# if any times are null/None that server has never been
|
||||
# contacted (so it's down still, probably)
|
||||
never_received_data = server_times.count(None)
|
||||
if never_received_data > 0:
|
||||
print(f"waiting because {never_received_data} server(s) not contacted")
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
# check that all times are 'recent enough'
|
||||
if any([time.time() - t > liveness for t in server_times]):
|
||||
# check that all times are 'recent enough' (it's OK if _some_ servers
|
||||
# are down, we just want to make sure a sufficient number are up)
|
||||
if len([time.time() - t <= liveness for t in server_times if t is not None]) < minimum_number_of_servers:
|
||||
print("waiting because at least one server too old")
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
2
newsfragments/4029.bugfix
Normal file
2
newsfragments/4029.bugfix
Normal file
@ -0,0 +1,2 @@
|
||||
The (still off-by-default) HTTP storage client will now use Tor when Tor-based client-side anonymity was requested.
|
||||
Previously it would use normal TCP connections and not be anonymous.
|
@ -21,11 +21,14 @@ in {
|
||||
pycddl = self.callPackage ./pycddl.nix { };
|
||||
txi2p = self.callPackage ./txi2p.nix { };
|
||||
|
||||
# Update the version of klein.
|
||||
# Some packages are of somewhat too-old versions - update them.
|
||||
klein = self.callPackage ./klein.nix {
|
||||
# Avoid infinite recursion.
|
||||
inherit (super) klein;
|
||||
};
|
||||
txtorcon = self.callPackage ./txtorcon.nix {
|
||||
inherit (super) txtorcon;
|
||||
};
|
||||
|
||||
# collections-extended is currently broken for Python 3.11 in nixpkgs but
|
||||
# we know where a working version lives.
|
||||
|
9
nix/txtorcon.nix
Normal file
9
nix/txtorcon.nix
Normal file
@ -0,0 +1,9 @@
|
||||
{ txtorcon, fetchPypi }:
|
||||
txtorcon.overrideAttrs (old: rec {
|
||||
pname = "txtorcon";
|
||||
version = "23.5.0";
|
||||
src = fetchPypi {
|
||||
inherit pname version;
|
||||
hash = "sha256-k/2Aqd1QX2mNCGT+k9uLapwRRLX+uRUwggtw7YmCZRw=";
|
||||
};
|
||||
})
|
7
setup.py
7
setup.py
@ -164,10 +164,9 @@ setup_requires = [
|
||||
]
|
||||
|
||||
tor_requires = [
|
||||
# This is exactly what `foolscap[tor]` means but pip resolves the pair of
|
||||
# dependencies "foolscap[i2p] foolscap[tor]" to "foolscap[i2p]" so we lose
|
||||
# this if we don't declare it ourselves!
|
||||
"txtorcon >= 0.17.0",
|
||||
# 23.5 added support for custom TLS contexts in web_agent(), which is
|
||||
# needed for the HTTP storage client to run over Tor.
|
||||
"txtorcon >= 23.5.0",
|
||||
]
|
||||
|
||||
i2p_requires = [
|
||||
|
@ -10,7 +10,6 @@ import weakref
|
||||
from typing import Optional, Iterable
|
||||
from base64 import urlsafe_b64encode
|
||||
from functools import partial
|
||||
# On Python 2 this will be the backported package:
|
||||
from configparser import NoSectionError
|
||||
|
||||
from foolscap.furl import (
|
||||
@ -47,7 +46,7 @@ from allmydata.util.encodingutil import get_filesystem_encoding
|
||||
from allmydata.util.abbreviate import parse_abbreviated_size
|
||||
from allmydata.util.time_format import parse_duration, parse_date
|
||||
from allmydata.util.i2p_provider import create as create_i2p_provider
|
||||
from allmydata.util.tor_provider import create as create_tor_provider
|
||||
from allmydata.util.tor_provider import create as create_tor_provider, _Provider as TorProvider
|
||||
from allmydata.stats import StatsProvider
|
||||
from allmydata.history import History
|
||||
from allmydata.interfaces import (
|
||||
@ -268,7 +267,7 @@ def create_client_from_config(config, _client_factory=None, _introducer_factory=
|
||||
introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory)
|
||||
storage_broker = create_storage_farm_broker(
|
||||
config, default_connection_handlers, foolscap_connection_handlers,
|
||||
tub_options, introducer_clients
|
||||
tub_options, introducer_clients, tor_provider
|
||||
)
|
||||
|
||||
client = _client_factory(
|
||||
@ -464,7 +463,7 @@ def create_introducer_clients(config, main_tub, _introducer_factory=None):
|
||||
return introducer_clients
|
||||
|
||||
|
||||
def create_storage_farm_broker(config: _Config, default_connection_handlers, foolscap_connection_handlers, tub_options, introducer_clients):
|
||||
def create_storage_farm_broker(config: _Config, default_connection_handlers, foolscap_connection_handlers, tub_options, introducer_clients, tor_provider: Optional[TorProvider]):
|
||||
"""
|
||||
Create a StorageFarmBroker object, for use by Uploader/Downloader
|
||||
(and everybody else who wants to use storage servers)
|
||||
@ -500,6 +499,8 @@ def create_storage_farm_broker(config: _Config, default_connection_handlers, foo
|
||||
tub_maker=tub_creator,
|
||||
node_config=config,
|
||||
storage_client_config=storage_client_config,
|
||||
default_connection_handlers=default_connection_handlers,
|
||||
tor_provider=tor_provider,
|
||||
)
|
||||
for ic in introducer_clients:
|
||||
sb.use_introducer(ic)
|
||||
|
@ -15,6 +15,7 @@ from typing import (
|
||||
TypedDict,
|
||||
Set,
|
||||
Dict,
|
||||
Callable,
|
||||
)
|
||||
from base64 import b64encode
|
||||
from io import BytesIO
|
||||
@ -31,7 +32,7 @@ from collections_extended import RangeMap
|
||||
from werkzeug.datastructures import Range, ContentRange
|
||||
from twisted.web.http_headers import Headers
|
||||
from twisted.web import http
|
||||
from twisted.web.iweb import IPolicyForHTTPS, IResponse
|
||||
from twisted.web.iweb import IPolicyForHTTPS, IResponse, IAgent
|
||||
from twisted.internet.defer import inlineCallbacks, Deferred, succeed
|
||||
from twisted.internet.interfaces import (
|
||||
IOpenSSLClientConnectionCreator,
|
||||
@ -337,11 +338,25 @@ class StorageClient(object):
|
||||
|
||||
@classmethod
|
||||
def from_nurl(
|
||||
cls, nurl: DecodedURL, reactor, pool: Optional[HTTPConnectionPool] = None
|
||||
cls,
|
||||
nurl: DecodedURL,
|
||||
reactor,
|
||||
# TODO default_connection_handlers should really be a class, not a dict
|
||||
# of strings...
|
||||
default_connection_handlers: dict[str, str],
|
||||
pool: Optional[HTTPConnectionPool] = None,
|
||||
agent_factory: Optional[
|
||||
Callable[[object, IPolicyForHTTPS, HTTPConnectionPool], IAgent]
|
||||
] = None,
|
||||
) -> StorageClient:
|
||||
"""
|
||||
Create a ``StorageClient`` for the given NURL.
|
||||
"""
|
||||
# Safety check: if we're using normal TCP connections, we better not be
|
||||
# configured for Tor or I2P.
|
||||
if agent_factory is None:
|
||||
assert default_connection_handlers["tcp"] == "tcp"
|
||||
|
||||
assert nurl.fragment == "v=1"
|
||||
assert nurl.scheme == "pb"
|
||||
swissnum = nurl.path[0].encode("ascii")
|
||||
@ -353,11 +368,21 @@ class StorageClient(object):
|
||||
if cls.TEST_MODE_REGISTER_HTTP_POOL is not None:
|
||||
cls.TEST_MODE_REGISTER_HTTP_POOL(pool)
|
||||
|
||||
def default_agent_factory(
|
||||
reactor: object,
|
||||
tls_context_factory: IPolicyForHTTPS,
|
||||
pool: HTTPConnectionPool,
|
||||
) -> IAgent:
|
||||
return Agent(reactor, tls_context_factory, pool=pool)
|
||||
|
||||
if agent_factory is None:
|
||||
agent_factory = default_agent_factory
|
||||
|
||||
treq_client = HTTPClient(
|
||||
Agent(
|
||||
agent_factory(
|
||||
reactor,
|
||||
_StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash),
|
||||
pool=pool,
|
||||
pool,
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -51,6 +51,7 @@ from zope.interface import (
|
||||
)
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.web import http
|
||||
from twisted.web.iweb import IAgent, IPolicyForHTTPS
|
||||
from twisted.internet.task import LoopingCall
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.application import service
|
||||
@ -77,6 +78,7 @@ from allmydata.grid_manager import (
|
||||
from allmydata.crypto import (
|
||||
ed25519,
|
||||
)
|
||||
from allmydata.util.tor_provider import _Provider as TorProvider
|
||||
from allmydata.util import log, base32, connection_status
|
||||
from allmydata.util.assertutil import precondition
|
||||
from allmydata.util.observer import ObserverList
|
||||
@ -202,8 +204,13 @@ class StorageFarmBroker(service.MultiService):
|
||||
tub_maker,
|
||||
node_config: _Config,
|
||||
storage_client_config=None,
|
||||
default_connection_handlers=None,
|
||||
tor_provider: Optional[TorProvider]=None,
|
||||
):
|
||||
service.MultiService.__init__(self)
|
||||
if default_connection_handlers is None:
|
||||
default_connection_handlers = {"tcp": "tcp"}
|
||||
|
||||
assert permute_peers # False not implemented yet
|
||||
self.permute_peers = permute_peers
|
||||
self._tub_maker = tub_maker
|
||||
@ -223,6 +230,8 @@ class StorageFarmBroker(service.MultiService):
|
||||
self.introducer_client = None
|
||||
self._threshold_listeners : list[tuple[float,defer.Deferred[Any]]]= [] # tuples of (threshold, Deferred)
|
||||
self._connected_high_water_mark = 0
|
||||
self._tor_provider = tor_provider
|
||||
self._default_connection_handlers = default_connection_handlers
|
||||
|
||||
@log_call(action_type=u"storage-client:broker:set-static-servers")
|
||||
def set_static_servers(self, servers):
|
||||
@ -315,6 +324,8 @@ class StorageFarmBroker(service.MultiService):
|
||||
server_id,
|
||||
server["ann"],
|
||||
grid_manager_verifier=gm_verifier,
|
||||
default_connection_handlers=self._default_connection_handlers,
|
||||
tor_provider=self._tor_provider
|
||||
)
|
||||
s.on_status_changed(lambda _: self._got_connection())
|
||||
return s
|
||||
@ -1049,7 +1060,7 @@ class HTTPNativeStorageServer(service.MultiService):
|
||||
"connected".
|
||||
"""
|
||||
|
||||
def __init__(self, server_id: bytes, announcement, reactor=reactor, grid_manager_verifier=None):
|
||||
def __init__(self, server_id: bytes, announcement, default_connection_handlers: dict[str,str], reactor=reactor, grid_manager_verifier=None, tor_provider: Optional[TorProvider]=None):
|
||||
service.MultiService.__init__(self)
|
||||
assert isinstance(server_id, bytes)
|
||||
self._server_id = server_id
|
||||
@ -1057,6 +1068,9 @@ class HTTPNativeStorageServer(service.MultiService):
|
||||
self._on_status_changed = ObserverList()
|
||||
self._reactor = reactor
|
||||
self._grid_manager_verifier = grid_manager_verifier
|
||||
self._tor_provider = tor_provider
|
||||
self._default_connection_handlers = default_connection_handlers
|
||||
|
||||
furl = announcement["anonymous-storage-FURL"].encode("utf-8")
|
||||
(
|
||||
self._nickname,
|
||||
@ -1218,6 +1232,26 @@ class HTTPNativeStorageServer(service.MultiService):
|
||||
self._connecting_deferred = connecting
|
||||
return connecting
|
||||
|
||||
async def _agent_factory(self) -> Optional[Callable[[object, IPolicyForHTTPS, HTTPConnectionPool],IAgent]]:
|
||||
"""Return a factory for ``twisted.web.iweb.IAgent``."""
|
||||
# TODO default_connection_handlers should really be an object, not a
|
||||
# dict, so we can ask "is this using Tor" without poking at a
|
||||
# dictionary with arbitrary strings... See
|
||||
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4032
|
||||
handler = self._default_connection_handlers["tcp"]
|
||||
if handler == "tcp":
|
||||
return None
|
||||
if handler == "tor":
|
||||
assert self._tor_provider is not None
|
||||
tor_instance = await self._tor_provider.get_tor_instance(self._reactor)
|
||||
|
||||
def agent_factory(reactor: object, tls_context_factory: IPolicyForHTTPS, pool: HTTPConnectionPool) -> IAgent:
|
||||
assert reactor == self._reactor
|
||||
return tor_instance.web_agent(pool=pool, tls_context_factory=tls_context_factory)
|
||||
return agent_factory
|
||||
else:
|
||||
raise RuntimeError(f"Unsupported tcp connection handler: {handler}")
|
||||
|
||||
@async_to_deferred
|
||||
async def _pick_server_and_get_version(self):
|
||||
"""
|
||||
@ -1236,20 +1270,27 @@ class HTTPNativeStorageServer(service.MultiService):
|
||||
# version() calls before we are live talking to a server, it could only
|
||||
# be one. See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3992
|
||||
|
||||
agent_factory = await self._agent_factory()
|
||||
|
||||
def request(reactor, nurl: DecodedURL):
|
||||
# Since we're just using this one off to check if the NURL
|
||||
# works, no need for persistent pool or other fanciness.
|
||||
pool = HTTPConnectionPool(reactor, persistent=False)
|
||||
pool.retryAutomatically = False
|
||||
return StorageClientGeneral(
|
||||
StorageClient.from_nurl(nurl, reactor, pool)
|
||||
StorageClient.from_nurl(
|
||||
nurl, reactor, self._default_connection_handlers,
|
||||
pool=pool, agent_factory=agent_factory)
|
||||
).get_version()
|
||||
|
||||
nurl = await _pick_a_http_server(reactor, self._nurls, request)
|
||||
|
||||
# If we've gotten this far, we've found a working NURL.
|
||||
self._istorage_server = _HTTPStorageServer.from_http_client(
|
||||
StorageClient.from_nurl(nurl, reactor)
|
||||
StorageClient.from_nurl(
|
||||
nurl, reactor, self._default_connection_handlers,
|
||||
agent_factory=agent_factory
|
||||
)
|
||||
)
|
||||
return self._istorage_server
|
||||
|
||||
|
@ -1,14 +1,6 @@
|
||||
"""
|
||||
Ported to Python 3.
|
||||
"""
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from future.utils import PY2
|
||||
if PY2:
|
||||
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
|
||||
|
||||
import os
|
||||
from twisted.trial import unittest
|
||||
@ -94,16 +86,15 @@ class LaunchTor(unittest.TestCase):
|
||||
reactor = object()
|
||||
private_dir = "private"
|
||||
txtorcon = mock.Mock()
|
||||
tpp = mock.Mock
|
||||
tpp.tor_protocol = mock.Mock()
|
||||
txtorcon.launch_tor = mock.Mock(return_value=tpp)
|
||||
tor = mock.Mock
|
||||
txtorcon.launch = mock.Mock(return_value=tor)
|
||||
|
||||
with mock.patch("allmydata.util.tor_provider.allocate_tcp_port",
|
||||
return_value=999999):
|
||||
d = tor_provider._launch_tor(reactor, tor_executable, private_dir,
|
||||
txtorcon)
|
||||
tor_control_endpoint, tor_control_proto = self.successResultOf(d)
|
||||
self.assertIs(tor_control_proto, tpp.tor_protocol)
|
||||
tor_control_endpoint, tor_result = self.successResultOf(d)
|
||||
self.assertIs(tor_result, tor)
|
||||
|
||||
def test_launch(self):
|
||||
return self._do_test_launch(None)
|
||||
@ -161,6 +152,12 @@ class ConnectToTor(unittest.TestCase):
|
||||
return self._do_test_connect(None, False)
|
||||
|
||||
|
||||
class FakeTor:
|
||||
"""Pretends to be a ``txtorcon.Tor`` instance."""
|
||||
def __init__(self):
|
||||
self.protocol = object()
|
||||
|
||||
|
||||
class CreateOnion(unittest.TestCase):
|
||||
def test_no_txtorcon(self):
|
||||
with mock.patch("allmydata.util.tor_provider._import_txtorcon",
|
||||
@ -171,6 +168,7 @@ class CreateOnion(unittest.TestCase):
|
||||
self.assertEqual(str(f.value),
|
||||
"Cannot create onion without txtorcon. "
|
||||
"Please 'pip install tahoe-lafs[tor]' to fix this.")
|
||||
|
||||
def _do_test_launch(self, executable):
|
||||
basedir = self.mktemp()
|
||||
os.mkdir(basedir)
|
||||
@ -181,9 +179,9 @@ class CreateOnion(unittest.TestCase):
|
||||
if executable:
|
||||
args.append("--tor-executable=%s" % executable)
|
||||
cli_config = make_cli_config(basedir, *args)
|
||||
protocol = object()
|
||||
tor_instance = FakeTor()
|
||||
launch_tor = mock.Mock(return_value=defer.succeed(("control_endpoint",
|
||||
protocol)))
|
||||
tor_instance)))
|
||||
txtorcon = mock.Mock()
|
||||
ehs = mock.Mock()
|
||||
# This appears to be a native string in the real txtorcon object...
|
||||
@ -204,8 +202,8 @@ class CreateOnion(unittest.TestCase):
|
||||
launch_tor.assert_called_with(reactor, executable,
|
||||
os.path.abspath(private_dir), txtorcon)
|
||||
txtorcon.EphemeralHiddenService.assert_called_with("3457 127.0.0.1:999999")
|
||||
ehs.add_to_tor.assert_called_with(protocol)
|
||||
ehs.remove_from_tor.assert_called_with(protocol)
|
||||
ehs.add_to_tor.assert_called_with(tor_instance.protocol)
|
||||
ehs.remove_from_tor.assert_called_with(tor_instance.protocol)
|
||||
|
||||
expected = {"launch": "true",
|
||||
"onion": "true",
|
||||
@ -587,13 +585,14 @@ class Provider_Service(unittest.TestCase):
|
||||
txtorcon = mock.Mock()
|
||||
with mock_txtorcon(txtorcon):
|
||||
p = tor_provider.create(reactor, cfg)
|
||||
tor_instance = FakeTor()
|
||||
tor_state = mock.Mock()
|
||||
tor_state.protocol = object()
|
||||
tor_state.protocol = tor_instance.protocol
|
||||
ehs = mock.Mock()
|
||||
ehs.add_to_tor = mock.Mock(return_value=defer.succeed(None))
|
||||
ehs.remove_from_tor = mock.Mock(return_value=defer.succeed(None))
|
||||
txtorcon.EphemeralHiddenService = mock.Mock(return_value=ehs)
|
||||
launch_tor = mock.Mock(return_value=defer.succeed((None,tor_state.protocol)))
|
||||
launch_tor = mock.Mock(return_value=defer.succeed((None,tor_instance)))
|
||||
with mock.patch("allmydata.util.tor_provider._launch_tor",
|
||||
launch_tor):
|
||||
d = p.startService()
|
||||
@ -628,9 +627,8 @@ class Provider_Service(unittest.TestCase):
|
||||
txtorcon = mock.Mock()
|
||||
with mock_txtorcon(txtorcon):
|
||||
p = tor_provider.create(reactor, cfg)
|
||||
tor_state = mock.Mock()
|
||||
tor_state.protocol = object()
|
||||
txtorcon.build_tor_connection = mock.Mock(return_value=tor_state)
|
||||
tor_instance = FakeTor()
|
||||
txtorcon.connect = mock.Mock(return_value=tor_instance)
|
||||
ehs = mock.Mock()
|
||||
ehs.add_to_tor = mock.Mock(return_value=defer.succeed(None))
|
||||
ehs.remove_from_tor = mock.Mock(return_value=defer.succeed(None))
|
||||
@ -642,12 +640,12 @@ class Provider_Service(unittest.TestCase):
|
||||
yield flushEventualQueue()
|
||||
self.successResultOf(d)
|
||||
self.assertIs(p._onion_ehs, ehs)
|
||||
self.assertIs(p._onion_tor_control_proto, tor_state.protocol)
|
||||
self.assertIs(p._onion_tor_control_proto, tor_instance.protocol)
|
||||
cfs.assert_called_with(reactor, "ep_desc")
|
||||
txtorcon.build_tor_connection.assert_called_with(tcep)
|
||||
txtorcon.connect.assert_called_with(reactor, tcep)
|
||||
txtorcon.EphemeralHiddenService.assert_called_with("456 127.0.0.1:123",
|
||||
b"private key")
|
||||
ehs.add_to_tor.assert_called_with(tor_state.protocol)
|
||||
ehs.add_to_tor.assert_called_with(tor_instance.protocol)
|
||||
|
||||
yield p.stopService()
|
||||
ehs.remove_from_tor.assert_called_with(tor_state.protocol)
|
||||
ehs.remove_from_tor.assert_called_with(tor_instance.protocol)
|
||||
|
@ -2,14 +2,10 @@
|
||||
"""
|
||||
Ported to Python 3.
|
||||
"""
|
||||
from __future__ import absolute_import, print_function, with_statement
|
||||
from __future__ import division
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from future.utils import PY2
|
||||
if PY2:
|
||||
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
import os
|
||||
|
||||
from zope.interface import (
|
||||
@ -27,6 +23,7 @@ from ..interfaces import (
|
||||
IAddressFamily,
|
||||
)
|
||||
|
||||
|
||||
def _import_tor():
|
||||
try:
|
||||
from foolscap.connections import tor
|
||||
@ -41,7 +38,7 @@ def _import_txtorcon():
|
||||
except ImportError: # pragma: no cover
|
||||
return None
|
||||
|
||||
def create(reactor, config, import_tor=None, import_txtorcon=None):
|
||||
def create(reactor, config, import_tor=None, import_txtorcon=None) -> Optional[_Provider]:
|
||||
"""
|
||||
Create a new _Provider service (this is an IService so must be
|
||||
hooked up to a parent or otherwise started).
|
||||
@ -98,33 +95,31 @@ def _try_to_connect(reactor, endpoint_desc, stdout, txtorcon):
|
||||
|
||||
@inlineCallbacks
|
||||
def _launch_tor(reactor, tor_executable, private_dir, txtorcon):
|
||||
"""
|
||||
Launches Tor, returns a corresponding ``(control endpoint string,
|
||||
txtorcon.Tor instance)`` tuple.
|
||||
"""
|
||||
# TODO: handle default tor-executable
|
||||
# TODO: it might be a good idea to find exactly which Tor we used,
|
||||
# and record it's absolute path into tahoe.cfg . This would protect
|
||||
# us against one Tor being on $PATH at create-node time, but then a
|
||||
# different Tor being present at node startup. OTOH, maybe we don't
|
||||
# need to worry about it.
|
||||
tor_config = txtorcon.TorConfig()
|
||||
tor_config.DataDirectory = data_directory(private_dir)
|
||||
|
||||
# unix-domain control socket
|
||||
tor_config.ControlPort = "unix:" + os.path.join(private_dir, "tor.control")
|
||||
tor_control_endpoint_desc = tor_config.ControlPort
|
||||
tor_control_endpoint_desc = "unix:" + os.path.join(private_dir, "tor.control")
|
||||
|
||||
tor_config.SOCKSPort = allocate_tcp_port()
|
||||
|
||||
tpp = yield txtorcon.launch_tor(
|
||||
tor_config, reactor,
|
||||
tor = yield txtorcon.launch(
|
||||
reactor,
|
||||
control_port=tor_control_endpoint_desc,
|
||||
data_directory=data_directory(private_dir),
|
||||
tor_binary=tor_executable,
|
||||
socks_port=allocate_tcp_port(),
|
||||
# can be useful when debugging; mirror Tor's output to ours
|
||||
# stdout=sys.stdout,
|
||||
# stderr=sys.stderr,
|
||||
)
|
||||
|
||||
# now tor is launched and ready to be spoken to
|
||||
# as a side effect, we've got an ITorControlProtocol ready to go
|
||||
tor_control_proto = tpp.tor_protocol
|
||||
|
||||
# How/when to shut down the new process? for normal usage, the child
|
||||
# tor will exit when it notices its parent (us) quit. Unit tests will
|
||||
# mock out txtorcon.launch_tor(), so there will never be a real Tor
|
||||
@ -134,7 +129,8 @@ def _launch_tor(reactor, tor_executable, private_dir, txtorcon):
|
||||
# (because it's a TorProcessProtocol) which returns a Deferred
|
||||
# that fires when Tor has actually exited.
|
||||
|
||||
returnValue((tor_control_endpoint_desc, tor_control_proto))
|
||||
returnValue((tor_control_endpoint_desc, tor))
|
||||
|
||||
|
||||
@inlineCallbacks
|
||||
def _connect_to_tor(reactor, cli_config, txtorcon):
|
||||
@ -169,8 +165,9 @@ def create_config(reactor, cli_config):
|
||||
if tor_executable:
|
||||
tahoe_config_tor["tor.executable"] = tor_executable
|
||||
print("launching Tor (to allocate .onion address)..", file=stdout)
|
||||
(_, tor_control_proto) = yield _launch_tor(
|
||||
(_, tor) = yield _launch_tor(
|
||||
reactor, tor_executable, private_dir, txtorcon)
|
||||
tor_control_proto = tor.protocol
|
||||
print("Tor launched", file=stdout)
|
||||
else:
|
||||
print("connecting to Tor (to allocate .onion address)..", file=stdout)
|
||||
@ -294,7 +291,7 @@ class _Provider(service.MultiService):
|
||||
returnValue(tor_control_endpoint)
|
||||
|
||||
def _get_launched_tor(self, reactor):
|
||||
# this fires with a tuple of (control_endpoint, tor_protocol)
|
||||
# this fires with a tuple of (control_endpoint, txtorcon.Tor instance)
|
||||
if not self._tor_launched:
|
||||
self._tor_launched = OneShotObserverList()
|
||||
private_dir = self._config.get_config_path("private")
|
||||
@ -325,17 +322,20 @@ class _Provider(service.MultiService):
|
||||
require("external_port")
|
||||
require("private_key_file")
|
||||
|
||||
@inlineCallbacks
|
||||
def _start_onion(self, reactor):
|
||||
def get_tor_instance(self, reactor: object):
|
||||
"""Return a ``Deferred`` that fires with a ``txtorcon.Tor`` instance."""
|
||||
# launch tor, if necessary
|
||||
if self._get_tor_config("launch", False, boolean=True):
|
||||
(_, tor_control_proto) = yield self._get_launched_tor(reactor)
|
||||
return self._get_launched_tor(reactor).addCallback(lambda t: t[1])
|
||||
else:
|
||||
controlport = self._get_tor_config("control.port", None)
|
||||
tcep = clientFromString(reactor, controlport)
|
||||
tor_state = yield self._txtorcon.build_tor_connection(tcep)
|
||||
tor_control_proto = tor_state.protocol
|
||||
return self._txtorcon.connect(reactor, tcep)
|
||||
|
||||
@inlineCallbacks
|
||||
def _start_onion(self, reactor):
|
||||
tor_instance = yield self.get_tor_instance(reactor)
|
||||
tor_control_proto = tor_instance.protocol
|
||||
local_port = int(self._get_tor_config("onion.local_port"))
|
||||
external_port = int(self._get_tor_config("onion.external_port"))
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user