Merge remote-tracking branch 'origin/master' into 3998-new-pyopenssl

This commit is contained in:
Jean-Paul Calderone
2023-06-13 10:35:51 -04:00
13 changed files with 238 additions and 121 deletions

View File

@ -166,7 +166,7 @@ jobs:
matrix: matrix:
include: include:
- os: macos-12 - os: macos-12
python-version: "3.9" python-version: "3.11"
force-foolscap: false force-foolscap: false
- os: windows-latest - os: windows-latest
python-version: "3.11" python-version: "3.11"

View File

@ -279,7 +279,7 @@ def introducer_furl(introducer, temp_dir):
return furl return furl
@pytest.fixture(scope='session') @pytest.fixture
@log_call( @log_call(
action_type=u"integration:tor:introducer", action_type=u"integration:tor:introducer",
include_args=["temp_dir", "flog_gatherer"], include_args=["temp_dir", "flog_gatherer"],
@ -342,7 +342,7 @@ def tor_introducer(reactor, temp_dir, flog_gatherer, request):
return transport return transport
@pytest.fixture(scope='session') @pytest.fixture
def tor_introducer_furl(tor_introducer, temp_dir): def tor_introducer_furl(tor_introducer, temp_dir):
furl_fname = join(temp_dir, 'introducer_tor', 'private', 'introducer.furl') furl_fname = join(temp_dir, 'introducer_tor', 'private', 'introducer.furl')
while not exists(furl_fname): while not exists(furl_fname):

View File

@ -19,6 +19,7 @@ from allmydata.test.common import (
write_introducer, write_introducer,
) )
from allmydata.client import read_config from allmydata.client import read_config
from allmydata.util.deferredutil import async_to_deferred
# see "conftest.py" for the fixtures (e.g. "tor_network") # see "conftest.py" for the fixtures (e.g. "tor_network")
@ -31,13 +32,26 @@ if sys.platform.startswith('win'):
@pytest_twisted.inlineCallbacks @pytest_twisted.inlineCallbacks
def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl): def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl):
carol = yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl) """
dave = yield _create_anonymous_node(reactor, 'dave', 8009, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl) Two nodes and an introducer all configured to use Tahoe.
The two nodes can talk to the introducer and each other: we upload to one
node, read from the other.
"""
carol = yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2)
dave = yield _create_anonymous_node(reactor, 'dave', 8009, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl, 2)
yield util.await_client_ready(carol, minimum_number_of_servers=2, timeout=600) yield util.await_client_ready(carol, minimum_number_of_servers=2, timeout=600)
yield util.await_client_ready(dave, minimum_number_of_servers=2, timeout=600) yield util.await_client_ready(dave, minimum_number_of_servers=2, timeout=600)
yield upload_to_one_download_from_the_other(reactor, temp_dir, carol, dave)
@async_to_deferred
async def upload_to_one_download_from_the_other(reactor, temp_dir, upload_to: util.TahoeProcess, download_from: util.TahoeProcess):
"""
Ensure both nodes are connected to "a grid" by uploading something via one
node, and retrieve it using the other.
"""
# ensure both nodes are connected to "a grid" by uploading
# something via carol, and retrieve it using dave.
gold_path = join(temp_dir, "gold") gold_path = join(temp_dir, "gold")
with open(gold_path, "w") as f: with open(gold_path, "w") as f:
f.write( f.write(
@ -54,12 +68,12 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne
sys.executable, sys.executable,
( (
sys.executable, '-b', '-m', 'allmydata.scripts.runner', sys.executable, '-b', '-m', 'allmydata.scripts.runner',
'-d', join(temp_dir, 'carol'), '-d', upload_to.node_dir,
'put', gold_path, 'put', gold_path,
), ),
env=environ, env=environ,
) )
yield proto.done await proto.done
cap = proto.output.getvalue().strip().split()[-1] cap = proto.output.getvalue().strip().split()[-1]
print("capability: {}".format(cap)) print("capability: {}".format(cap))
@ -69,19 +83,18 @@ def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_ne
sys.executable, sys.executable,
( (
sys.executable, '-b', '-m', 'allmydata.scripts.runner', sys.executable, '-b', '-m', 'allmydata.scripts.runner',
'-d', join(temp_dir, 'dave'), '-d', download_from.node_dir,
'get', cap, 'get', cap,
), ),
env=environ, env=environ,
) )
yield proto.done await proto.done
download_got = proto.output.getvalue().strip()
dave_got = proto.output.getvalue().strip() assert download_got == open(gold_path, 'rb').read().strip()
assert dave_got == open(gold_path, 'rb').read().strip()
@pytest_twisted.inlineCallbacks @pytest_twisted.inlineCallbacks
def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl): def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_gatherer, tor_network, introducer_furl, shares_total: int) -> util.TahoeProcess:
node_dir = FilePath(temp_dir).child(name) node_dir = FilePath(temp_dir).child(name)
web_port = "tcp:{}:interface=localhost".format(control_port + 2000) web_port = "tcp:{}:interface=localhost".format(control_port + 2000)
@ -103,7 +116,7 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_
'--listen', 'tor', '--listen', 'tor',
'--shares-needed', '1', '--shares-needed', '1',
'--shares-happy', '1', '--shares-happy', '1',
'--shares-total', '2', '--shares-total', str(shares_total),
node_dir.path, node_dir.path,
), ),
env=environ, env=environ,
@ -113,9 +126,9 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_
# Which services should this client connect to? # Which services should this client connect to?
write_introducer(node_dir, "default", introducer_furl) write_introducer(node_dir, "default", introducer_furl)
util.basic_node_configuration(request, flog_gatherer, node_dir.path)
config = read_config(node_dir.path, "tub.port") config = read_config(node_dir.path, "tub.port")
config.set_config("node", "log_gatherer.furl", flog_gatherer)
config.set_config("tor", "onion", "true") config.set_config("tor", "onion", "true")
config.set_config("tor", "onion.external_port", "3457") config.set_config("tor", "onion.external_port", "3457")
config.set_config("tor", "control.port", f"tcp:port={control_port}:host=127.0.0.1") config.set_config("tor", "control.port", f"tcp:port={control_port}:host=127.0.0.1")
@ -125,3 +138,26 @@ def _create_anonymous_node(reactor, name, control_port, request, temp_dir, flog_
result = yield util._run_node(reactor, node_dir.path, request, None) result = yield util._run_node(reactor, node_dir.path, request, None)
print("okay, launched") print("okay, launched")
return result return result
@pytest.mark.skipif(sys.platform.startswith('darwin'), reason='This test has issues on macOS')
@pytest_twisted.inlineCallbacks
def test_anonymous_client(reactor, request, temp_dir, flog_gatherer, tor_network, introducer_furl):
"""
A normal node (normie) and a normal introducer are configured, and one node
(anonymoose) which is configured to be anonymous by talking via Tor.
Anonymoose should be able to communicate with normie.
TODO how to ensure that anonymoose is actually using Tor?
"""
normie = yield util._create_node(
reactor, request, temp_dir, introducer_furl, flog_gatherer, "normie",
web_port="tcp:9989:interface=localhost",
storage=True, needed=1, happy=1, total=1,
)
yield util.await_client_ready(normie)
anonymoose = yield _create_anonymous_node(reactor, 'anonymoose', 8008, request, temp_dir, flog_gatherer, tor_network, introducer_furl, 1)
yield util.await_client_ready(anonymoose, minimum_number_of_servers=1, timeout=600)
yield upload_to_one_download_from_the_other(reactor, temp_dir, normie, anonymoose)

View File

@ -140,7 +140,8 @@ class _MagicTextProtocol(ProcessProtocol):
def outReceived(self, data): def outReceived(self, data):
data = str(data, sys.stdout.encoding) data = str(data, sys.stdout.encoding)
sys.stdout.write(self.name + data) for line in data.splitlines():
sys.stdout.write(self.name + line + "\n")
self._output.write(data) self._output.write(data)
if not self.magic_seen.called and self._magic_text in self._output.getvalue(): if not self.magic_seen.called and self._magic_text in self._output.getvalue():
print("Saw '{}' in the logs".format(self._magic_text)) print("Saw '{}' in the logs".format(self._magic_text))
@ -148,7 +149,8 @@ class _MagicTextProtocol(ProcessProtocol):
def errReceived(self, data): def errReceived(self, data):
data = str(data, sys.stderr.encoding) data = str(data, sys.stderr.encoding)
sys.stdout.write(self.name + data) for line in data.splitlines():
sys.stdout.write(self.name + line + "\n")
def _cleanup_process_async(transport: IProcessTransport, allow_missing: bool) -> None: def _cleanup_process_async(transport: IProcessTransport, allow_missing: bool) -> None:
@ -311,6 +313,36 @@ def _run_node(reactor, node_dir, request, magic_text, finalize=True):
return d return d
def basic_node_configuration(request, flog_gatherer, node_dir: str):
"""
Setup common configuration options for a node, given a ``pytest`` request
fixture.
"""
config_path = join(node_dir, 'tahoe.cfg')
config = get_config(config_path)
set_config(
config,
u'node',
u'log_gatherer.furl',
flog_gatherer,
)
force_foolscap = request.config.getoption("force_foolscap")
assert force_foolscap in (True, False)
set_config(
config,
'storage',
'force_foolscap',
str(force_foolscap),
)
set_config(
config,
'client',
'force_foolscap',
str(force_foolscap),
)
write_config(FilePath(config_path), config)
def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, name, web_port, def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, name, web_port,
storage=True, storage=True,
magic_text=None, magic_text=None,
@ -351,29 +383,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
created_d = done_proto.done created_d = done_proto.done
def created(_): def created(_):
config_path = join(node_dir, 'tahoe.cfg') basic_node_configuration(request, flog_gatherer, node_dir)
config = get_config(config_path)
set_config(
config,
u'node',
u'log_gatherer.furl',
flog_gatherer,
)
force_foolscap = request.config.getoption("force_foolscap")
assert force_foolscap in (True, False)
set_config(
config,
'storage',
'force_foolscap',
str(force_foolscap),
)
set_config(
config,
'client',
'force_foolscap',
str(force_foolscap),
)
write_config(FilePath(config_path), config)
created_d.addCallback(created) created_d.addCallback(created)
d = Deferred() d = Deferred()
@ -621,16 +631,9 @@ def await_client_ready(tahoe, timeout=10, liveness=60*2, minimum_number_of_serve
server['last_received_data'] server['last_received_data']
for server in servers for server in servers
] ]
# if any times are null/None that server has never been # check that all times are 'recent enough' (it's OK if _some_ servers
# contacted (so it's down still, probably) # are down, we just want to make sure a sufficient number are up)
never_received_data = server_times.count(None) if len([time.time() - t <= liveness for t in server_times if t is not None]) < minimum_number_of_servers:
if never_received_data > 0:
print(f"waiting because {never_received_data} server(s) not contacted")
time.sleep(1)
continue
# check that all times are 'recent enough'
if any([time.time() - t > liveness for t in server_times]):
print("waiting because at least one server too old") print("waiting because at least one server too old")
time.sleep(1) time.sleep(1)
continue continue

View File

@ -0,0 +1,2 @@
The (still off-by-default) HTTP storage client will now use Tor when Tor-based client-side anonymity was requested.
Previously it would use normal TCP connections and not be anonymous.

View File

@ -21,11 +21,14 @@ in {
pycddl = self.callPackage ./pycddl.nix { }; pycddl = self.callPackage ./pycddl.nix { };
txi2p = self.callPackage ./txi2p.nix { }; txi2p = self.callPackage ./txi2p.nix { };
# Update the version of klein. # Some packages are of somewhat too-old versions - update them.
klein = self.callPackage ./klein.nix { klein = self.callPackage ./klein.nix {
# Avoid infinite recursion. # Avoid infinite recursion.
inherit (super) klein; inherit (super) klein;
}; };
txtorcon = self.callPackage ./txtorcon.nix {
inherit (super) txtorcon;
};
# Update the version of pyopenssl. # Update the version of pyopenssl.
pyopenssl = self.callPackage ./pyopenssl.nix { pyopenssl = self.callPackage ./pyopenssl.nix {

9
nix/txtorcon.nix Normal file
View File

@ -0,0 +1,9 @@
{ txtorcon, fetchPypi }:
txtorcon.overrideAttrs (old: rec {
pname = "txtorcon";
version = "23.5.0";
src = fetchPypi {
inherit pname version;
hash = "sha256-k/2Aqd1QX2mNCGT+k9uLapwRRLX+uRUwggtw7YmCZRw=";
};
})

View File

@ -163,10 +163,9 @@ setup_requires = [
] ]
tor_requires = [ tor_requires = [
# This is exactly what `foolscap[tor]` means but pip resolves the pair of # 23.5 added support for custom TLS contexts in web_agent(), which is
# dependencies "foolscap[i2p] foolscap[tor]" to "foolscap[i2p]" so we lose # needed for the HTTP storage client to run over Tor.
# this if we don't declare it ourselves! "txtorcon >= 23.5.0",
"txtorcon >= 0.17.0",
] ]
i2p_requires = [ i2p_requires = [

View File

@ -10,7 +10,6 @@ import weakref
from typing import Optional, Iterable from typing import Optional, Iterable
from base64 import urlsafe_b64encode from base64 import urlsafe_b64encode
from functools import partial from functools import partial
# On Python 2 this will be the backported package:
from configparser import NoSectionError from configparser import NoSectionError
from foolscap.furl import ( from foolscap.furl import (
@ -47,7 +46,7 @@ from allmydata.util.encodingutil import get_filesystem_encoding
from allmydata.util.abbreviate import parse_abbreviated_size from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.util.time_format import parse_duration, parse_date from allmydata.util.time_format import parse_duration, parse_date
from allmydata.util.i2p_provider import create as create_i2p_provider from allmydata.util.i2p_provider import create as create_i2p_provider
from allmydata.util.tor_provider import create as create_tor_provider from allmydata.util.tor_provider import create as create_tor_provider, _Provider as TorProvider
from allmydata.stats import StatsProvider from allmydata.stats import StatsProvider
from allmydata.history import History from allmydata.history import History
from allmydata.interfaces import ( from allmydata.interfaces import (
@ -268,7 +267,7 @@ def create_client_from_config(config, _client_factory=None, _introducer_factory=
introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory) introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory)
storage_broker = create_storage_farm_broker( storage_broker = create_storage_farm_broker(
config, default_connection_handlers, foolscap_connection_handlers, config, default_connection_handlers, foolscap_connection_handlers,
tub_options, introducer_clients tub_options, introducer_clients, tor_provider
) )
client = _client_factory( client = _client_factory(
@ -464,7 +463,7 @@ def create_introducer_clients(config, main_tub, _introducer_factory=None):
return introducer_clients return introducer_clients
def create_storage_farm_broker(config: _Config, default_connection_handlers, foolscap_connection_handlers, tub_options, introducer_clients): def create_storage_farm_broker(config: _Config, default_connection_handlers, foolscap_connection_handlers, tub_options, introducer_clients, tor_provider: Optional[TorProvider]):
""" """
Create a StorageFarmBroker object, for use by Uploader/Downloader Create a StorageFarmBroker object, for use by Uploader/Downloader
(and everybody else who wants to use storage servers) (and everybody else who wants to use storage servers)
@ -500,6 +499,8 @@ def create_storage_farm_broker(config: _Config, default_connection_handlers, foo
tub_maker=tub_creator, tub_maker=tub_creator,
node_config=config, node_config=config,
storage_client_config=storage_client_config, storage_client_config=storage_client_config,
default_connection_handlers=default_connection_handlers,
tor_provider=tor_provider,
) )
for ic in introducer_clients: for ic in introducer_clients:
sb.use_introducer(ic) sb.use_introducer(ic)

View File

@ -15,6 +15,7 @@ from typing import (
TypedDict, TypedDict,
Set, Set,
Dict, Dict,
Callable,
) )
from base64 import b64encode from base64 import b64encode
from io import BytesIO from io import BytesIO
@ -31,7 +32,7 @@ from collections_extended import RangeMap
from werkzeug.datastructures import Range, ContentRange from werkzeug.datastructures import Range, ContentRange
from twisted.web.http_headers import Headers from twisted.web.http_headers import Headers
from twisted.web import http from twisted.web import http
from twisted.web.iweb import IPolicyForHTTPS, IResponse from twisted.web.iweb import IPolicyForHTTPS, IResponse, IAgent
from twisted.internet.defer import inlineCallbacks, Deferred, succeed from twisted.internet.defer import inlineCallbacks, Deferred, succeed
from twisted.internet.interfaces import ( from twisted.internet.interfaces import (
IOpenSSLClientConnectionCreator, IOpenSSLClientConnectionCreator,
@ -334,11 +335,25 @@ class StorageClient(object):
@classmethod @classmethod
def from_nurl( def from_nurl(
cls, nurl: DecodedURL, reactor, pool: Optional[HTTPConnectionPool] = None cls,
nurl: DecodedURL,
reactor,
# TODO default_connection_handlers should really be a class, not a dict
# of strings...
default_connection_handlers: dict[str, str],
pool: Optional[HTTPConnectionPool] = None,
agent_factory: Optional[
Callable[[object, IPolicyForHTTPS, HTTPConnectionPool], IAgent]
] = None,
) -> StorageClient: ) -> StorageClient:
""" """
Create a ``StorageClient`` for the given NURL. Create a ``StorageClient`` for the given NURL.
""" """
# Safety check: if we're using normal TCP connections, we better not be
# configured for Tor or I2P.
if agent_factory is None:
assert default_connection_handlers["tcp"] == "tcp"
assert nurl.fragment == "v=1" assert nurl.fragment == "v=1"
assert nurl.scheme == "pb" assert nurl.scheme == "pb"
swissnum = nurl.path[0].encode("ascii") swissnum = nurl.path[0].encode("ascii")
@ -350,11 +365,21 @@ class StorageClient(object):
if cls.TEST_MODE_REGISTER_HTTP_POOL is not None: if cls.TEST_MODE_REGISTER_HTTP_POOL is not None:
cls.TEST_MODE_REGISTER_HTTP_POOL(pool) cls.TEST_MODE_REGISTER_HTTP_POOL(pool)
def default_agent_factory(
reactor: object,
tls_context_factory: IPolicyForHTTPS,
pool: HTTPConnectionPool,
) -> IAgent:
return Agent(reactor, tls_context_factory, pool=pool)
if agent_factory is None:
agent_factory = default_agent_factory
treq_client = HTTPClient( treq_client = HTTPClient(
Agent( agent_factory(
reactor, reactor,
_StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash), _StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash),
pool=pool, pool,
) )
) )

View File

@ -51,6 +51,7 @@ from zope.interface import (
) )
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.web import http from twisted.web import http
from twisted.web.iweb import IAgent, IPolicyForHTTPS
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.application import service from twisted.application import service
@ -77,6 +78,7 @@ from allmydata.grid_manager import (
from allmydata.crypto import ( from allmydata.crypto import (
ed25519, ed25519,
) )
from allmydata.util.tor_provider import _Provider as TorProvider
from allmydata.util import log, base32, connection_status from allmydata.util import log, base32, connection_status
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.util.observer import ObserverList from allmydata.util.observer import ObserverList
@ -202,8 +204,13 @@ class StorageFarmBroker(service.MultiService):
tub_maker, tub_maker,
node_config: _Config, node_config: _Config,
storage_client_config=None, storage_client_config=None,
default_connection_handlers=None,
tor_provider: Optional[TorProvider]=None,
): ):
service.MultiService.__init__(self) service.MultiService.__init__(self)
if default_connection_handlers is None:
default_connection_handlers = {"tcp": "tcp"}
assert permute_peers # False not implemented yet assert permute_peers # False not implemented yet
self.permute_peers = permute_peers self.permute_peers = permute_peers
self._tub_maker = tub_maker self._tub_maker = tub_maker
@ -223,6 +230,8 @@ class StorageFarmBroker(service.MultiService):
self.introducer_client = None self.introducer_client = None
self._threshold_listeners : list[tuple[float,defer.Deferred[Any]]]= [] # tuples of (threshold, Deferred) self._threshold_listeners : list[tuple[float,defer.Deferred[Any]]]= [] # tuples of (threshold, Deferred)
self._connected_high_water_mark = 0 self._connected_high_water_mark = 0
self._tor_provider = tor_provider
self._default_connection_handlers = default_connection_handlers
@log_call(action_type=u"storage-client:broker:set-static-servers") @log_call(action_type=u"storage-client:broker:set-static-servers")
def set_static_servers(self, servers): def set_static_servers(self, servers):
@ -315,6 +324,8 @@ class StorageFarmBroker(service.MultiService):
server_id, server_id,
server["ann"], server["ann"],
grid_manager_verifier=gm_verifier, grid_manager_verifier=gm_verifier,
default_connection_handlers=self._default_connection_handlers,
tor_provider=self._tor_provider
) )
s.on_status_changed(lambda _: self._got_connection()) s.on_status_changed(lambda _: self._got_connection())
return s return s
@ -1049,7 +1060,7 @@ class HTTPNativeStorageServer(service.MultiService):
"connected". "connected".
""" """
def __init__(self, server_id: bytes, announcement, reactor=reactor, grid_manager_verifier=None): def __init__(self, server_id: bytes, announcement, default_connection_handlers: dict[str,str], reactor=reactor, grid_manager_verifier=None, tor_provider: Optional[TorProvider]=None):
service.MultiService.__init__(self) service.MultiService.__init__(self)
assert isinstance(server_id, bytes) assert isinstance(server_id, bytes)
self._server_id = server_id self._server_id = server_id
@ -1057,6 +1068,9 @@ class HTTPNativeStorageServer(service.MultiService):
self._on_status_changed = ObserverList() self._on_status_changed = ObserverList()
self._reactor = reactor self._reactor = reactor
self._grid_manager_verifier = grid_manager_verifier self._grid_manager_verifier = grid_manager_verifier
self._tor_provider = tor_provider
self._default_connection_handlers = default_connection_handlers
furl = announcement["anonymous-storage-FURL"].encode("utf-8") furl = announcement["anonymous-storage-FURL"].encode("utf-8")
( (
self._nickname, self._nickname,
@ -1218,6 +1232,26 @@ class HTTPNativeStorageServer(service.MultiService):
self._connecting_deferred = connecting self._connecting_deferred = connecting
return connecting return connecting
async def _agent_factory(self) -> Optional[Callable[[object, IPolicyForHTTPS, HTTPConnectionPool],IAgent]]:
"""Return a factory for ``twisted.web.iweb.IAgent``."""
# TODO default_connection_handlers should really be an object, not a
# dict, so we can ask "is this using Tor" without poking at a
# dictionary with arbitrary strings... See
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4032
handler = self._default_connection_handlers["tcp"]
if handler == "tcp":
return None
if handler == "tor":
assert self._tor_provider is not None
tor_instance = await self._tor_provider.get_tor_instance(self._reactor)
def agent_factory(reactor: object, tls_context_factory: IPolicyForHTTPS, pool: HTTPConnectionPool) -> IAgent:
assert reactor == self._reactor
return tor_instance.web_agent(pool=pool, tls_context_factory=tls_context_factory)
return agent_factory
else:
raise RuntimeError(f"Unsupported tcp connection handler: {handler}")
@async_to_deferred @async_to_deferred
async def _pick_server_and_get_version(self): async def _pick_server_and_get_version(self):
""" """
@ -1236,20 +1270,27 @@ class HTTPNativeStorageServer(service.MultiService):
# version() calls before we are live talking to a server, it could only # version() calls before we are live talking to a server, it could only
# be one. See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3992 # be one. See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3992
agent_factory = await self._agent_factory()
def request(reactor, nurl: DecodedURL): def request(reactor, nurl: DecodedURL):
# Since we're just using this one off to check if the NURL # Since we're just using this one off to check if the NURL
# works, no need for persistent pool or other fanciness. # works, no need for persistent pool or other fanciness.
pool = HTTPConnectionPool(reactor, persistent=False) pool = HTTPConnectionPool(reactor, persistent=False)
pool.retryAutomatically = False pool.retryAutomatically = False
return StorageClientGeneral( return StorageClientGeneral(
StorageClient.from_nurl(nurl, reactor, pool) StorageClient.from_nurl(
nurl, reactor, self._default_connection_handlers,
pool=pool, agent_factory=agent_factory)
).get_version() ).get_version()
nurl = await _pick_a_http_server(reactor, self._nurls, request) nurl = await _pick_a_http_server(reactor, self._nurls, request)
# If we've gotten this far, we've found a working NURL. # If we've gotten this far, we've found a working NURL.
self._istorage_server = _HTTPStorageServer.from_http_client( self._istorage_server = _HTTPStorageServer.from_http_client(
StorageClient.from_nurl(nurl, reactor) StorageClient.from_nurl(
nurl, reactor, self._default_connection_handlers,
agent_factory=agent_factory
)
) )
return self._istorage_server return self._istorage_server

View File

@ -1,14 +1,6 @@
""" """
Ported to Python 3. Ported to Python 3.
""" """
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import os import os
from twisted.trial import unittest from twisted.trial import unittest
@ -94,16 +86,15 @@ class LaunchTor(unittest.TestCase):
reactor = object() reactor = object()
private_dir = "private" private_dir = "private"
txtorcon = mock.Mock() txtorcon = mock.Mock()
tpp = mock.Mock tor = mock.Mock
tpp.tor_protocol = mock.Mock() txtorcon.launch = mock.Mock(return_value=tor)
txtorcon.launch_tor = mock.Mock(return_value=tpp)
with mock.patch("allmydata.util.tor_provider.allocate_tcp_port", with mock.patch("allmydata.util.tor_provider.allocate_tcp_port",
return_value=999999): return_value=999999):
d = tor_provider._launch_tor(reactor, tor_executable, private_dir, d = tor_provider._launch_tor(reactor, tor_executable, private_dir,
txtorcon) txtorcon)
tor_control_endpoint, tor_control_proto = self.successResultOf(d) tor_control_endpoint, tor_result = self.successResultOf(d)
self.assertIs(tor_control_proto, tpp.tor_protocol) self.assertIs(tor_result, tor)
def test_launch(self): def test_launch(self):
return self._do_test_launch(None) return self._do_test_launch(None)
@ -161,6 +152,12 @@ class ConnectToTor(unittest.TestCase):
return self._do_test_connect(None, False) return self._do_test_connect(None, False)
class FakeTor:
"""Pretends to be a ``txtorcon.Tor`` instance."""
def __init__(self):
self.protocol = object()
class CreateOnion(unittest.TestCase): class CreateOnion(unittest.TestCase):
def test_no_txtorcon(self): def test_no_txtorcon(self):
with mock.patch("allmydata.util.tor_provider._import_txtorcon", with mock.patch("allmydata.util.tor_provider._import_txtorcon",
@ -171,6 +168,7 @@ class CreateOnion(unittest.TestCase):
self.assertEqual(str(f.value), self.assertEqual(str(f.value),
"Cannot create onion without txtorcon. " "Cannot create onion without txtorcon. "
"Please 'pip install tahoe-lafs[tor]' to fix this.") "Please 'pip install tahoe-lafs[tor]' to fix this.")
def _do_test_launch(self, executable): def _do_test_launch(self, executable):
basedir = self.mktemp() basedir = self.mktemp()
os.mkdir(basedir) os.mkdir(basedir)
@ -181,9 +179,9 @@ class CreateOnion(unittest.TestCase):
if executable: if executable:
args.append("--tor-executable=%s" % executable) args.append("--tor-executable=%s" % executable)
cli_config = make_cli_config(basedir, *args) cli_config = make_cli_config(basedir, *args)
protocol = object() tor_instance = FakeTor()
launch_tor = mock.Mock(return_value=defer.succeed(("control_endpoint", launch_tor = mock.Mock(return_value=defer.succeed(("control_endpoint",
protocol))) tor_instance)))
txtorcon = mock.Mock() txtorcon = mock.Mock()
ehs = mock.Mock() ehs = mock.Mock()
# This appears to be a native string in the real txtorcon object... # This appears to be a native string in the real txtorcon object...
@ -204,8 +202,8 @@ class CreateOnion(unittest.TestCase):
launch_tor.assert_called_with(reactor, executable, launch_tor.assert_called_with(reactor, executable,
os.path.abspath(private_dir), txtorcon) os.path.abspath(private_dir), txtorcon)
txtorcon.EphemeralHiddenService.assert_called_with("3457 127.0.0.1:999999") txtorcon.EphemeralHiddenService.assert_called_with("3457 127.0.0.1:999999")
ehs.add_to_tor.assert_called_with(protocol) ehs.add_to_tor.assert_called_with(tor_instance.protocol)
ehs.remove_from_tor.assert_called_with(protocol) ehs.remove_from_tor.assert_called_with(tor_instance.protocol)
expected = {"launch": "true", expected = {"launch": "true",
"onion": "true", "onion": "true",
@ -587,13 +585,14 @@ class Provider_Service(unittest.TestCase):
txtorcon = mock.Mock() txtorcon = mock.Mock()
with mock_txtorcon(txtorcon): with mock_txtorcon(txtorcon):
p = tor_provider.create(reactor, cfg) p = tor_provider.create(reactor, cfg)
tor_instance = FakeTor()
tor_state = mock.Mock() tor_state = mock.Mock()
tor_state.protocol = object() tor_state.protocol = tor_instance.protocol
ehs = mock.Mock() ehs = mock.Mock()
ehs.add_to_tor = mock.Mock(return_value=defer.succeed(None)) ehs.add_to_tor = mock.Mock(return_value=defer.succeed(None))
ehs.remove_from_tor = mock.Mock(return_value=defer.succeed(None)) ehs.remove_from_tor = mock.Mock(return_value=defer.succeed(None))
txtorcon.EphemeralHiddenService = mock.Mock(return_value=ehs) txtorcon.EphemeralHiddenService = mock.Mock(return_value=ehs)
launch_tor = mock.Mock(return_value=defer.succeed((None,tor_state.protocol))) launch_tor = mock.Mock(return_value=defer.succeed((None,tor_instance)))
with mock.patch("allmydata.util.tor_provider._launch_tor", with mock.patch("allmydata.util.tor_provider._launch_tor",
launch_tor): launch_tor):
d = p.startService() d = p.startService()
@ -628,9 +627,8 @@ class Provider_Service(unittest.TestCase):
txtorcon = mock.Mock() txtorcon = mock.Mock()
with mock_txtorcon(txtorcon): with mock_txtorcon(txtorcon):
p = tor_provider.create(reactor, cfg) p = tor_provider.create(reactor, cfg)
tor_state = mock.Mock() tor_instance = FakeTor()
tor_state.protocol = object() txtorcon.connect = mock.Mock(return_value=tor_instance)
txtorcon.build_tor_connection = mock.Mock(return_value=tor_state)
ehs = mock.Mock() ehs = mock.Mock()
ehs.add_to_tor = mock.Mock(return_value=defer.succeed(None)) ehs.add_to_tor = mock.Mock(return_value=defer.succeed(None))
ehs.remove_from_tor = mock.Mock(return_value=defer.succeed(None)) ehs.remove_from_tor = mock.Mock(return_value=defer.succeed(None))
@ -642,12 +640,12 @@ class Provider_Service(unittest.TestCase):
yield flushEventualQueue() yield flushEventualQueue()
self.successResultOf(d) self.successResultOf(d)
self.assertIs(p._onion_ehs, ehs) self.assertIs(p._onion_ehs, ehs)
self.assertIs(p._onion_tor_control_proto, tor_state.protocol) self.assertIs(p._onion_tor_control_proto, tor_instance.protocol)
cfs.assert_called_with(reactor, "ep_desc") cfs.assert_called_with(reactor, "ep_desc")
txtorcon.build_tor_connection.assert_called_with(tcep) txtorcon.connect.assert_called_with(reactor, tcep)
txtorcon.EphemeralHiddenService.assert_called_with("456 127.0.0.1:123", txtorcon.EphemeralHiddenService.assert_called_with("456 127.0.0.1:123",
b"private key") b"private key")
ehs.add_to_tor.assert_called_with(tor_state.protocol) ehs.add_to_tor.assert_called_with(tor_instance.protocol)
yield p.stopService() yield p.stopService()
ehs.remove_from_tor.assert_called_with(tor_state.protocol) ehs.remove_from_tor.assert_called_with(tor_instance.protocol)

View File

@ -2,14 +2,10 @@
""" """
Ported to Python 3. Ported to Python 3.
""" """
from __future__ import absolute_import, print_function, with_statement
from __future__ import division
from __future__ import unicode_literals
from future.utils import PY2 from __future__ import annotations
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from typing import Optional
import os import os
from zope.interface import ( from zope.interface import (
@ -27,6 +23,7 @@ from ..interfaces import (
IAddressFamily, IAddressFamily,
) )
def _import_tor(): def _import_tor():
try: try:
from foolscap.connections import tor from foolscap.connections import tor
@ -41,7 +38,7 @@ def _import_txtorcon():
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
return None return None
def create(reactor, config, import_tor=None, import_txtorcon=None): def create(reactor, config, import_tor=None, import_txtorcon=None) -> Optional[_Provider]:
""" """
Create a new _Provider service (this is an IService so must be Create a new _Provider service (this is an IService so must be
hooked up to a parent or otherwise started). hooked up to a parent or otherwise started).
@ -98,33 +95,31 @@ def _try_to_connect(reactor, endpoint_desc, stdout, txtorcon):
@inlineCallbacks @inlineCallbacks
def _launch_tor(reactor, tor_executable, private_dir, txtorcon): def _launch_tor(reactor, tor_executable, private_dir, txtorcon):
"""
Launches Tor, returns a corresponding ``(control endpoint string,
txtorcon.Tor instance)`` tuple.
"""
# TODO: handle default tor-executable # TODO: handle default tor-executable
# TODO: it might be a good idea to find exactly which Tor we used, # TODO: it might be a good idea to find exactly which Tor we used,
# and record it's absolute path into tahoe.cfg . This would protect # and record it's absolute path into tahoe.cfg . This would protect
# us against one Tor being on $PATH at create-node time, but then a # us against one Tor being on $PATH at create-node time, but then a
# different Tor being present at node startup. OTOH, maybe we don't # different Tor being present at node startup. OTOH, maybe we don't
# need to worry about it. # need to worry about it.
tor_config = txtorcon.TorConfig()
tor_config.DataDirectory = data_directory(private_dir)
# unix-domain control socket # unix-domain control socket
tor_config.ControlPort = "unix:" + os.path.join(private_dir, "tor.control") tor_control_endpoint_desc = "unix:" + os.path.join(private_dir, "tor.control")
tor_control_endpoint_desc = tor_config.ControlPort
tor_config.SOCKSPort = allocate_tcp_port() tor = yield txtorcon.launch(
reactor,
tpp = yield txtorcon.launch_tor( control_port=tor_control_endpoint_desc,
tor_config, reactor, data_directory=data_directory(private_dir),
tor_binary=tor_executable, tor_binary=tor_executable,
socks_port=allocate_tcp_port(),
# can be useful when debugging; mirror Tor's output to ours # can be useful when debugging; mirror Tor's output to ours
# stdout=sys.stdout, # stdout=sys.stdout,
# stderr=sys.stderr, # stderr=sys.stderr,
) )
# now tor is launched and ready to be spoken to
# as a side effect, we've got an ITorControlProtocol ready to go
tor_control_proto = tpp.tor_protocol
# How/when to shut down the new process? for normal usage, the child # How/when to shut down the new process? for normal usage, the child
# tor will exit when it notices its parent (us) quit. Unit tests will # tor will exit when it notices its parent (us) quit. Unit tests will
# mock out txtorcon.launch_tor(), so there will never be a real Tor # mock out txtorcon.launch_tor(), so there will never be a real Tor
@ -134,7 +129,8 @@ def _launch_tor(reactor, tor_executable, private_dir, txtorcon):
# (because it's a TorProcessProtocol) which returns a Deferred # (because it's a TorProcessProtocol) which returns a Deferred
# that fires when Tor has actually exited. # that fires when Tor has actually exited.
returnValue((tor_control_endpoint_desc, tor_control_proto)) returnValue((tor_control_endpoint_desc, tor))
@inlineCallbacks @inlineCallbacks
def _connect_to_tor(reactor, cli_config, txtorcon): def _connect_to_tor(reactor, cli_config, txtorcon):
@ -169,8 +165,9 @@ def create_config(reactor, cli_config):
if tor_executable: if tor_executable:
tahoe_config_tor["tor.executable"] = tor_executable tahoe_config_tor["tor.executable"] = tor_executable
print("launching Tor (to allocate .onion address)..", file=stdout) print("launching Tor (to allocate .onion address)..", file=stdout)
(_, tor_control_proto) = yield _launch_tor( (_, tor) = yield _launch_tor(
reactor, tor_executable, private_dir, txtorcon) reactor, tor_executable, private_dir, txtorcon)
tor_control_proto = tor.protocol
print("Tor launched", file=stdout) print("Tor launched", file=stdout)
else: else:
print("connecting to Tor (to allocate .onion address)..", file=stdout) print("connecting to Tor (to allocate .onion address)..", file=stdout)
@ -294,7 +291,7 @@ class _Provider(service.MultiService):
returnValue(tor_control_endpoint) returnValue(tor_control_endpoint)
def _get_launched_tor(self, reactor): def _get_launched_tor(self, reactor):
# this fires with a tuple of (control_endpoint, tor_protocol) # this fires with a tuple of (control_endpoint, txtorcon.Tor instance)
if not self._tor_launched: if not self._tor_launched:
self._tor_launched = OneShotObserverList() self._tor_launched = OneShotObserverList()
private_dir = self._config.get_config_path("private") private_dir = self._config.get_config_path("private")
@ -325,17 +322,20 @@ class _Provider(service.MultiService):
require("external_port") require("external_port")
require("private_key_file") require("private_key_file")
@inlineCallbacks def get_tor_instance(self, reactor: object):
def _start_onion(self, reactor): """Return a ``Deferred`` that fires with a ``txtorcon.Tor`` instance."""
# launch tor, if necessary # launch tor, if necessary
if self._get_tor_config("launch", False, boolean=True): if self._get_tor_config("launch", False, boolean=True):
(_, tor_control_proto) = yield self._get_launched_tor(reactor) return self._get_launched_tor(reactor).addCallback(lambda t: t[1])
else: else:
controlport = self._get_tor_config("control.port", None) controlport = self._get_tor_config("control.port", None)
tcep = clientFromString(reactor, controlport) tcep = clientFromString(reactor, controlport)
tor_state = yield self._txtorcon.build_tor_connection(tcep) return self._txtorcon.connect(reactor, tcep)
tor_control_proto = tor_state.protocol
@inlineCallbacks
def _start_onion(self, reactor):
tor_instance = yield self.get_tor_instance(reactor)
tor_control_proto = tor_instance.protocol
local_port = int(self._get_tor_config("onion.local_port")) local_port = int(self._get_tor_config("onion.local_port"))
external_port = int(self._get_tor_config("onion.external_port")) external_port = int(self._get_tor_config("onion.external_port"))