Merge remote-tracking branch 'origin/master' into 3935-happy-eyeballs

This commit is contained in:
Itamar Turner-Trauring 2023-03-08 10:45:55 -05:00
commit cf00c6918c
37 changed files with 2672 additions and 103 deletions

View File

@ -5,6 +5,23 @@ To run:
$ pytest benchmarks/upload_download.py -s -v -Wignore
To add latency of e.g. 60ms on Linux:
$ tc qdisc add dev lo root netem delay 30ms
To reset:
$ tc qdisc del dev lo root netem
Frequency scaling can spoil the results.
To see the range of frequency scaling on a Linux system:
$ cat /sys/devices/system/cpu/cpu*/cpufreq/scaling_available_frequencies
And to pin the CPU frequency to the lower bound found in these files:
$ sudo cpupower frequency-set -f <lowest available frequency>
TODO Parameterization (pytest?)
- Foolscap vs not foolscap
@ -30,6 +47,7 @@ from tempfile import mkdtemp
import os
from twisted.trial.unittest import TestCase
from twisted.internet.defer import gatherResults
from allmydata.util.deferredutil import async_to_deferred
from allmydata.util.consumer import MemoryConsumer
@ -56,6 +74,10 @@ class ImmutableBenchmarks(SystemTestMixin, TestCase):
# To use Foolscap, change to True:
FORCE_FOOLSCAP_FOR_STORAGE = False
# Don't reduce HTTP connection timeouts, that messes up the more aggressive
# benchmarks:
REDUCE_HTTP_CLIENT_TIMEOUT = False
@async_to_deferred
async def setUp(self):
SystemTestMixin.setUp(self)
@ -104,3 +126,13 @@ class ImmutableBenchmarks(SystemTestMixin, TestCase):
with timeit("download"):
data = await result.download_best_version()
self.assertEqual(data, DATA)
@async_to_deferred
async def test_upload_mutable_in_parallel(self):
# To test larger files, change this:
DATA = b"Some data to upload\n" * 1_000_000
with timeit(" upload"):
await gatherResults([
self.clients[0].create_mutable_file(MutableData(DATA))
for _ in range(20)
])

View File

@ -980,6 +980,9 @@ the node will not use an Introducer at all.
Such "introducerless" clients must be configured with static servers (described
below), or they will not be able to upload and download files.
.. _server_list:
Static Server Definitions
=========================

View File

@ -32,6 +32,7 @@ Contents:
gpg-setup
servers
managed-grid
helper
convergence-secret
garbage-collection

342
docs/managed-grid.rst Normal file
View File

@ -0,0 +1,342 @@
Managed Grid
============
This document explains the "Grid Manager" concept and the
`grid-manager` command. Someone operating a grid may choose to use a
Grid Manager. Operators of storage-servers and clients will then be
given additional configuration in this case.
Overview and Motivation
-----------------------
In a grid using an Introducer, a client will use any storage-server
the Introducer announces (and the Introducer will announce any
storage-server that connects to it). This means that anyone with the
Introducer fURL can connect storage to the grid.
Sometimes, this is just what you want!
For some use-cases, though, you want to have clients only use certain
servers. One case might be a "managed" grid, where some entity runs
the grid; clients of this grid don't want their uploads to go to
"unmanaged" storage if some other client decides to provide storage.
One way to limit which storage servers a client connects to is via the
"server list" (:ref:`server_list`) (aka "Introducerless"
mode). Clients are given static lists of storage-servers, and connect
only to those. This means manually updating these lists if the storage
servers change, however.
Another method is for clients to use `[client] peers.preferred=`
configuration option (:ref:`Client Configuration`), which suffers
from a similar disadvantage.
Grid Manager
------------
A "grid-manager" consists of some data defining a keypair (along with
some other details) and Tahoe sub-commands to manipulate the data and
produce certificates to give to storage-servers. Certificates assert
the statement: "Grid Manager X suggests you use storage-server Y to
upload shares to" (X and Y are public-keys). Such a certificate
consists of:
- the version of the format the certificate conforms to (`1`)
- the public-key of a storage-server
- an expiry timestamp
- a signature of the above
A client will always use any storage-server for downloads (expired
certificate, or no certificate) because clients check the ciphertext
and re-assembled plaintext against the keys in the capability;
"grid-manager" certificates only control uploads.
Clients make use of this functionality by configuring one or more Grid Manager public keys.
This tells the client to only upload to storage-servers that have a currently-valid certificate from any of the Grid Managers their client allows.
In case none are configured, the default behavior (of using any storage server) prevails.
Grid Manager Data Storage
-------------------------
The data defining the grid-manager is stored in an arbitrary
directory, which you indicate with the ``--config`` option (in the
future, we may add the ability to store the data directly in a grid,
at which time you may be able to pass a directory-capability to this
option).
If you don't want to store the configuration on disk at all, you may
use ``--config -`` (the last character is a dash) and write a valid
JSON configuration to stdin.
All commands require the ``--config`` option and they all behave
similarly for "data from stdin" versus "data from disk". A directory
(and not a file) is used on disk because in that mode, each
certificate issued is also stored alongside the configuration
document; in "stdin / stdout" mode, an issued certificate is only
ever available on stdout.
The configuration is a JSON document. It is subject to change as Grid
Manager evolves. It contains a version number in the
`grid_manager_config_version` key which will increment whenever the
document schema changes.
grid-manager create
```````````````````
Create a new grid-manager.
If you specify ``--config -`` then a new grid-manager configuration is
written to stdout. Otherwise, a new grid-manager is created in the
directory specified by the ``--config`` option. It is an error if the
directory already exists.
grid-manager public-identity
````````````````````````````
Print out a grid-manager's public key. This key is derived from the
private-key of the grid-manager, so a valid grid-manager config must
be given via ``--config``
This public key is what is put in clients' configuration to actually
validate and use grid-manager certificates.
grid-manager add
````````````````
Takes two args: ``name pubkey``. The ``name`` is an arbitrary local
identifier for the new storage node (also sometimes called "a petname"
or "nickname"). The pubkey is the tahoe-encoded key from a ``node.pubkey``
file in the storage-server's node directory (minus any
whitespace). For example, if ``~/storage0`` contains a storage-node,
you might do something like this::
grid-manager --config ./gm0 add storage0 $(cat ~/storage0/node.pubkey)
This adds a new storage-server to a Grid Manager's
configuration. (Since it mutates the configuration, if you used
``--config -`` the new configuration will be printed to stdout). The
usefulness of the ``name`` is solely for reference within this Grid
Manager.
grid-manager list
`````````````````
Lists all storage-servers that have previously been added using
``grid-manager add``.
grid-manager sign
`````````````````
Takes two args: ``name expiry_days``. The ``name`` is a nickname used
previously in a ``grid-manager add`` command and ``expiry_days`` is
the number of days in the future when the certificate should expire.
Note that this mutates the state of the grid-manager if it is on disk,
by adding this certificate to our collection of issued
certificates. If you used ``--config -``, the certificate isn't
persisted anywhere except to stdout (so if you wish to keep it
somewhere, that is up to you).
This command creates a new "version 1" certificate for a
storage-server (identified by its public key). The new certificate is
printed to stdout. If you stored the config on disk, the new
certificate will (also) be in a file named like ``alice.cert.0``.
Enrolling a Storage Server: CLI
-------------------------------
tahoe admin add-grid-manager-cert
`````````````````````````````````
- `--filename`: the file to read the cert from
- `--name`: the name of this certificate
Import a "version 1" storage-certificate produced by a grid-manager A
storage server may have zero or more such certificates installed; for
now just one is sufficient. You will have to re-start your node after
this. Subsequent announcements to the Introducer will include this
certificate.
.. note::
This command will simply edit the `tahoe.cfg` file and direct you
to re-start. In the Future(tm), we should consider (in exarkun's
words):
"A python program you run as a new process" might not be the
best abstraction to layer on top of the configuration
persistence system, though. It's a nice abstraction for users
(although most users would probably rather have a GUI) but it's
not a great abstraction for automation. So at some point it
may be better if there is CLI -> public API -> configuration
persistence system. And maybe "public API" is even a network
API for the storage server so it's equally easy to access from
an agent implemented in essentially any language and maybe if
the API is exposed by the storage node itself then this also
gives you live-configuration-updates, avoiding the need for
node restarts (not that this is the only way to accomplish
this, but I think it's a good way because it avoids the need
for messes like inotify and it supports the notion that the
storage node process is in charge of its own configuration
persistence system, not just one consumer among many ... which
has some nice things going for it ... though how this interacts
exactly with further node management automation might bear
closer scrutiny).
Enrolling a Storage Server: Config
----------------------------------
You may edit the ``[storage]`` section of the ``tahoe.cfg`` file to
turn on grid-management with ``grid_management = true``. You then must
also provide a ``[grid_management_certificates]`` section in the
config-file which lists ``name = path/to/certificate`` pairs.
These certificate files are issued by the ``grid-manager sign``
command; these should be transmitted to the storage server operator
who includes them in the config for the storage server. Relative paths
are based from the node directory. Example::
[storage]
grid_management = true
[grid_management_certificates]
default = example_grid.cert
This will cause us to give this certificate to any Introducers we
connect to (and subsequently, the Introducer will give the certificate
out to clients).
Enrolling a Client: Config
--------------------------
You may instruct a Tahoe client to use only storage servers from given
Grid Managers. If there are no such keys, any servers are used
(but see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3979). If
there are one or more keys, the client will only upload to a storage
server that has a valid certificate (from any of the keys).
To specify public-keys, add a ``[grid_managers]`` section to the
config. This consists of ``name = value`` pairs where ``name`` is an
arbitrary name and ``value`` is a public-key of a Grid
Manager. Example::
[grid_managers]
example_grid = pub-v0-vqimc4s5eflwajttsofisp5st566dbq36xnpp4siz57ufdavpvlq
See also https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3507 which
proposes a command to edit the config.
Example Setup of a New Managed Grid
-----------------------------------
This example creates an actual grid, but it's all just on one machine
with different "node directories" and a separate tahoe process for
each node. Usually of course each storage server would be on a
separate computer.
Note that we use the ``daemonize`` command in the following but that's
only one way to handle "running a command in the background". You
could instead run commands that start with ``daemonize ...`` in their
own shell/terminal window or via something like ``systemd``
We'll store our Grid Manager configuration on disk, in
``./gm0``. To initialize this directory::
grid-manager --config ./gm0 create
(If you already have a grid, you can :ref:`skip ahead <skip_ahead>`.)
First of all, create an Introducer. Note that we actually have to run
it briefly before it creates the "Introducer fURL" we want for the
next steps::
tahoe create-introducer --listen=tcp --port=5555 --location=tcp:localhost:5555 ./introducer
daemonize tahoe -d introducer run
Next, we attach a couple of storage nodes::
tahoe create-node --introducer $(cat introducer/private/introducer.furl) --nickname storage0 --webport 6001 --location tcp:localhost:6003 --port 6003 ./storage0
tahoe create-node --introducer $(cat introducer/private/introducer.furl) --nickname storage1 --webport 6101 --location tcp:localhost:6103 --port 6103 ./storage1
daemonize tahoe -d storage0 run
daemonize tahoe -d storage1 run
.. _skip_ahead:
We can now tell the Grid Manager about our new storage servers::
grid-manager --config ./gm0 add storage0 $(cat storage0/node.pubkey)
grid-manager --config ./gm0 add storage1 $(cat storage1/node.pubkey)
To produce a new certificate for each node, we do this::
grid-manager --config ./gm0 sign storage0 > ./storage0/gridmanager.cert
grid-manager --config ./gm0 sign storage1 > ./storage1/gridmanager.cert
Now, we want our storage servers to actually announce these
certificates into the grid. We do this by adding some configuration
(in ``tahoe.cfg``)::
[storage]
grid_management = true
[grid_manager_certificates]
default = gridmanager.cert
Add the above bit to each node's ``tahoe.cfg`` and re-start the
storage nodes. (Alternatively, use the ``tahoe add-grid-manager``
command).
Now try adding a new storage server ``storage2``. This client can join
the grid just fine, and announce itself to the Introducer as providing
storage::
tahoe create-node --introducer $(cat introducer/private/introducer.furl) --nickname storage2 --webport 6301 --location tcp:localhost:6303 --port 6303 ./storage2
daemonize tahoe -d storage2 run
At this point any client will upload to any of these three
storage-servers. Make a client "alice" and try!
::
tahoe create-client --introducer $(cat introducer/private/introducer.furl) --nickname alice --webport 6401 --shares-total=3 --shares-needed=2 --shares-happy=3 ./alice
daemonize tahoe -d alice run
tahoe -d alice put README.rst # prints out a read-cap
find storage2/storage/shares # confirm storage2 has a share
Now we want to make Alice only upload to the storage servers that the
grid-manager has given certificates to (``storage0`` and
``storage1``). We need the grid-manager's public key to put in Alice's
configuration::
grid-manager --config ./gm0 public-identity
Put the key printed out above into Alice's ``tahoe.cfg`` in section
``client``::
[grid_managers]
example_name = pub-v0-vqimc4s5eflwajttsofisp5st566dbq36xnpp4siz57ufdavpvlq
Now, re-start the "alice" client. Since we made Alice's parameters
require 3 storage servers to be reachable (``--happy=3``), all their
uploads should now fail (so ``tahoe put`` will fail) because they
won't use storage2 and thus can't "achieve happiness".
A proposal to expose more information about Grid Manager and
certificate status in the Welcome page is discussed in
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3506

View File

@ -55,9 +55,12 @@ def i2p_network(reactor, temp_dir, request):
proto,
which("docker"),
(
"docker", "run", "-p", "7656:7656", "purplei2p/i2pd:release-2.43.0",
"docker", "run", "-p", "7656:7656", "purplei2p/i2pd:release-2.45.1",
# Bad URL for reseeds, so it can't talk to other routers.
"--reseed.urls", "http://localhost:1/",
# Make sure we see the "ephemeral keys message"
"--log=stdout",
"--loglevel=info"
),
)

View File

@ -0,0 +1 @@
Tahoe-LAFS now includes a new "Grid Manager" specification and implementation adding more options to control which storage servers a client will use for uploads.

0
newsfragments/3968.minor Normal file
View File

View File

@ -144,6 +144,9 @@ install_requires = [
# amount of copying involved.
"pycddl >= 0.4",
# Command-line parsing
"click >= 7.0",
# for pid-file support
"psutil",
"filelock",
@ -428,6 +431,11 @@ setup(name="tahoe-lafs", # also set in __init__.py
},
include_package_data=True,
setup_requires=setup_requires,
entry_points = { 'console_scripts': [ 'tahoe = allmydata.scripts.runner:run' ] },
entry_points={
'console_scripts': [
'tahoe = allmydata.scripts.runner:run',
'grid-manager = allmydata.cli.grid_manager:grid_manager',
]
},
**setup_args
)

View File

View File

@ -0,0 +1,224 @@
"""
A CLI for configuring a grid manager.
"""
from typing import Optional
from datetime import (
timedelta,
)
import click
from twisted.python.filepath import (
FilePath,
)
from allmydata.crypto import (
ed25519,
)
from allmydata.util.abbreviate import (
abbreviate_time,
)
from allmydata.grid_manager import (
create_grid_manager,
save_grid_manager,
load_grid_manager,
current_datetime_with_zone,
)
from allmydata.util import jsonbytes as json
@click.group()
@click.option(
'--config', '-c',
type=click.Path(),
help="Configuration directory (or - for stdin)",
required=True,
)
@click.pass_context
def grid_manager(ctx, config):
"""
A Tahoe Grid Manager issues certificates to storage-servers
A Tahoe client with one or more Grid Manager public keys
configured will only upload to a Storage Server that presents a
valid certificate signed by one of the configured Grid
Manager keys.
Grid Manager configuration can be in a local directory or given
via stdin. It contains long-term secret information (a private
signing key) and should be kept safe.
"""
class Config(object):
"""
Available to all sub-commands as Click's context.obj
"""
_grid_manager = None
@property
def grid_manager(self):
if self._grid_manager is None:
config_path = _config_path_from_option(config)
try:
self._grid_manager = load_grid_manager(config_path)
except ValueError as e:
raise click.ClickException(
"Error loading Grid Manager from '{}': {}".format(config, e)
)
return self._grid_manager
ctx.obj = Config()
@grid_manager.command()
@click.pass_context
def create(ctx):
"""
Make a new Grid Manager
"""
config_location = ctx.parent.params["config"]
fp = None
if config_location != '-':
fp = FilePath(config_location)
gm = create_grid_manager()
try:
save_grid_manager(fp, gm)
except OSError as e:
raise click.ClickException(
"Can't create '{}': {}".format(config_location, e)
)
@grid_manager.command()
@click.pass_obj
def public_identity(config):
"""
Show the public identity key of a Grid Manager
This is what you give to clients to add to their configuration so
they use announcements from this Grid Manager
"""
click.echo(config.grid_manager.public_identity())
@grid_manager.command()
@click.argument("name")
@click.argument("public_key", type=click.STRING)
@click.pass_context
def add(ctx, name, public_key):
"""
Add a new storage-server by name to a Grid Manager
PUBLIC_KEY is the contents of a node.pubkey file from a Tahoe
node-directory. NAME is an arbitrary label.
"""
public_key = public_key.encode("ascii")
try:
ctx.obj.grid_manager.add_storage_server(
name,
ed25519.verifying_key_from_string(public_key),
)
except KeyError:
raise click.ClickException(
"A storage-server called '{}' already exists".format(name)
)
save_grid_manager(
_config_path_from_option(ctx.parent.params["config"]),
ctx.obj.grid_manager,
create=False,
)
return 0
@grid_manager.command()
@click.argument("name")
@click.pass_context
def remove(ctx, name):
"""
Remove an existing storage-server by name from a Grid Manager
"""
fp = _config_path_from_option(ctx.parent.params["config"])
try:
ctx.obj.grid_manager.remove_storage_server(name)
except KeyError:
raise click.ClickException(
"No storage-server called '{}' exists".format(name)
)
cert_count = 0
if fp is not None:
while fp.child('{}.cert.{}'.format(name, cert_count)).exists():
fp.child('{}.cert.{}'.format(name, cert_count)).remove()
cert_count += 1
save_grid_manager(fp, ctx.obj.grid_manager, create=False)
@grid_manager.command() # noqa: F811
@click.pass_context
def list(ctx):
"""
List all storage-servers known to a Grid Manager
"""
for name in sorted(ctx.obj.grid_manager.storage_servers.keys()):
blank_name = " " * len(name)
click.echo("{}: {}".format(
name,
str(ctx.obj.grid_manager.storage_servers[name].public_key_string(), "utf-8")))
for cert in ctx.obj.grid_manager.storage_servers[name].certificates:
delta = current_datetime_with_zone() - cert.expires
click.echo("{} cert {}: ".format(blank_name, cert.index), nl=False)
if delta.total_seconds() < 0:
click.echo("valid until {} ({})".format(cert.expires, abbreviate_time(delta)))
else:
click.echo("expired {} ({})".format(cert.expires, abbreviate_time(delta)))
@grid_manager.command()
@click.argument("name")
@click.argument(
"expiry_days",
type=click.IntRange(1, 5*365), # XXX is 5 years a good maximum?
)
@click.pass_context
def sign(ctx, name, expiry_days):
"""
sign a new certificate
"""
fp = _config_path_from_option(ctx.parent.params["config"])
expiry = timedelta(days=expiry_days)
try:
certificate = ctx.obj.grid_manager.sign(name, expiry)
except KeyError:
raise click.ClickException(
"No storage-server called '{}' exists".format(name)
)
certificate_data = json.dumps(certificate.marshal(), indent=4)
click.echo(certificate_data)
if fp is not None:
next_serial = 0
f = None
while f is None:
fname = "{}.cert.{}".format(name, next_serial)
try:
f = fp.child(fname).create()
except FileExistsError:
f = None
except OSError as e:
raise click.ClickException(f"{fname}: {e}")
next_serial += 1
with f:
f.write(certificate_data.encode("ascii"))
def _config_path_from_option(config: str) -> Optional[FilePath]:
"""
:param str config: a path or -
:returns: a FilePath instance or None
"""
if config == "-":
return None
return FilePath(config)

View File

@ -3,8 +3,11 @@ Ported to Python 3.
"""
from __future__ import annotations
import os
import stat
import time
import weakref
from typing import Optional
import os, stat, time, weakref
from base64 import urlsafe_b64encode
from functools import partial
# On Python 2 this will be the backported package:
@ -26,6 +29,7 @@ from twisted.application.internet import TimerService
from twisted.python.filepath import FilePath
import allmydata
from allmydata import node
from allmydata.crypto import rsa, ed25519
from allmydata.crypto.util import remove_prefix
from allmydata.storage.server import StorageServer, FoolscapStorageServer
@ -56,7 +60,6 @@ from allmydata.interfaces import (
)
from allmydata.nodemaker import NodeMaker
from allmydata.blacklist import Blacklist
from allmydata import node
KiB=1024
@ -73,7 +76,8 @@ def _is_valid_section(section_name):
"""
return (
section_name.startswith("storageserver.plugins.") or
section_name.startswith("storageclient.plugins.")
section_name.startswith("storageclient.plugins.") or
section_name in ("grid_managers", "grid_manager_certificates")
)
@ -106,6 +110,7 @@ _client_config = configutil.ValidConfiguration(
"reserved_space",
"storage_dir",
"plugins",
"grid_management",
"force_foolscap",
),
"sftpd": (
@ -490,6 +495,7 @@ def create_storage_farm_broker(config, default_connection_handlers, foolscap_con
**kwargs
)
# create the actual storage-broker
sb = storage_client.StorageFarmBroker(
permute_peers=True,
tub_maker=tub_creator,
@ -796,7 +802,8 @@ class _Client(node.Node, pollmixin.PollMixin):
sharetypes.append("mutable")
expiration_sharetypes = tuple(sharetypes)
ss = StorageServer(storedir, self.nodeid,
ss = StorageServer(
storedir, self.nodeid,
reserved_space=reserved,
discard_storage=discard,
readonly_storage=readonly,
@ -805,7 +812,8 @@ class _Client(node.Node, pollmixin.PollMixin):
expiration_mode=mode,
expiration_override_lease_duration=o_l_d,
expiration_cutoff_date=cutoff_date,
expiration_sharetypes=expiration_sharetypes)
expiration_sharetypes=expiration_sharetypes,
)
ss.setServiceParent(self)
return ss
@ -847,6 +855,14 @@ class _Client(node.Node, pollmixin.PollMixin):
announcement.update(plugins_announcement)
if self.config.get_config("storage", "grid_management", default=False, boolean=True):
grid_manager_certificates = self.config.get_grid_manager_certificates()
announcement[u"grid-manager-certificates"] = grid_manager_certificates
# Note: certificates are not verified for validity here, but
# that may be useful. See:
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3977
for ic in self.introducer_clients:
ic.publish("storage", announcement, self._node_private_key)

View File

@ -13,20 +13,7 @@ cut-and-pasteability. The base62 encoding is shorter than the base32 form,
but the minor usability improvement is not worth the documentation and
specification confusion of using a non-standard encoding. So we stick with
base32.
Ported to Python 3.
'''
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import six
from cryptography.exceptions import (
InvalidSignature,
@ -72,7 +59,7 @@ def verifying_key_from_signing_key(private_key):
return private_key.public_key()
def sign_data(private_key, data):
def sign_data(private_key, data: bytes) -> bytes:
"""
Sign the given data using the given private key
@ -86,7 +73,7 @@ def sign_data(private_key, data):
"""
_validate_private_key(private_key)
if not isinstance(data, six.binary_type):
if not isinstance(data, bytes):
raise ValueError('data must be bytes')
return private_key.sign(data)
@ -110,7 +97,7 @@ def string_from_signing_key(private_key):
return PRIVATE_KEY_PREFIX + b2a(raw_key_bytes)
def signing_keypair_from_string(private_key_bytes):
def signing_keypair_from_string(private_key_bytes: bytes):
"""
Load a signing keypair from a string of bytes (which includes the
PRIVATE_KEY_PREFIX)
@ -118,7 +105,7 @@ def signing_keypair_from_string(private_key_bytes):
:returns: a 2-tuple of (private_key, public_key)
"""
if not isinstance(private_key_bytes, six.binary_type):
if not isinstance(private_key_bytes, bytes):
raise ValueError('private_key_bytes must be bytes')
private_key = Ed25519PrivateKey.from_private_bytes(
@ -127,7 +114,7 @@ def signing_keypair_from_string(private_key_bytes):
return private_key, private_key.public_key()
def verify_signature(public_key, alleged_signature, data):
def verify_signature(public_key, alleged_signature: bytes, data: bytes):
"""
:param public_key: a verifying key
@ -139,10 +126,10 @@ def verify_signature(public_key, alleged_signature, data):
:returns: None (or raises an exception).
"""
if not isinstance(alleged_signature, six.binary_type):
if not isinstance(alleged_signature, bytes):
raise ValueError('alleged_signature must be bytes')
if not isinstance(data, six.binary_type):
if not isinstance(data, bytes):
raise ValueError('data must be bytes')
_validate_public_key(public_key)
@ -159,7 +146,7 @@ def verifying_key_from_string(public_key_bytes):
:returns: a public_key
"""
if not isinstance(public_key_bytes, six.binary_type):
if not isinstance(public_key_bytes, bytes):
raise ValueError('public_key_bytes must be bytes')
return Ed25519PublicKey.from_public_bytes(
@ -167,7 +154,7 @@ def verifying_key_from_string(public_key_bytes):
)
def string_from_verifying_key(public_key):
def string_from_verifying_key(public_key) -> bytes:
"""
Encode a public key to a string of bytes
@ -183,7 +170,7 @@ def string_from_verifying_key(public_key):
return PUBLIC_KEY_PREFIX + b2a(raw_key_bytes)
def _validate_public_key(public_key):
def _validate_public_key(public_key: Ed25519PublicKey):
"""
Internal helper. Verify that `public_key` is an appropriate object
"""
@ -192,7 +179,7 @@ def _validate_public_key(public_key):
return None
def _validate_private_key(private_key):
def _validate_private_key(private_key: Ed25519PrivateKey):
"""
Internal helper. Verify that `private_key` is an appropriate object
"""

View File

@ -0,0 +1,495 @@
"""
Functions and classes relating to the Grid Manager internal state
"""
import sys
from datetime import (
datetime,
timezone,
)
from typing import (
Optional,
Union,
List,
)
from twisted.python.filepath import FilePath
from allmydata.crypto import (
ed25519,
)
from allmydata.util import (
base32,
jsonbytes as json,
dictutil,
)
from attrs import (
frozen,
Factory,
)
@frozen
class SignedCertificate(object):
"""
A signed certificate.
"""
# A JSON-encoded, UTF-8-encoded certificate.
certificate : bytes
# The signature (although the signature is in base32 in "public",
# this contains the decoded raw bytes -- not base32)
signature : bytes
@classmethod
def load(cls, file_like):
data = json.load(file_like)
return cls(
certificate=data["certificate"].encode("utf-8"),
signature=base32.a2b(data["signature"].encode("ascii")),
)
def marshal(self):
"""
:return dict: a json-able dict
"""
return dict(
certificate=self.certificate,
signature=base32.b2a(self.signature),
)
@frozen
class _GridManagerStorageServer(object):
"""
A Grid Manager's notion of a storage server
"""
name : str
public_key : ed25519.Ed25519PublicKey
certificates : list = Factory(list) # SignedCertificates
def add_certificate(self, certificate):
"""
Add ``certificate``
"""
self.certificates.append(certificate)
def public_key_string(self) -> bytes:
"""
:returns: the public key as bytes.
"""
return ed25519.string_from_verifying_key(self.public_key)
def marshal(self):
"""
:returns: a dict suitable for JSON representing this object
"""
return {
u"public_key": self.public_key_string(),
}
@frozen
class _GridManagerCertificate(object):
"""
Represents a single certificate for a single storage-server
"""
filename : str
index : int
expires : datetime
public_key : ed25519.Ed25519PublicKey
def create_grid_manager():
"""
Create a new Grid Manager with a fresh keypair
"""
private_key, public_key = ed25519.create_signing_keypair()
return _GridManager(
ed25519.string_from_signing_key(private_key),
{},
)
def current_datetime_with_zone():
"""
:returns: a timezone-aware datetime object representing the
current timestamp in UTC
"""
return datetime.now(timezone.utc)
def _load_certificates_for(config_path: FilePath, name: str, gm_key=Optional[ed25519.Ed25519PublicKey]) -> List[_GridManagerCertificate]:
"""
Load any existing certificates for the given storage-server.
:param FilePath config_path: the configuration location (or None for
stdin)
:param str name: the name of an existing storage-server
:param ed25519.Ed25519PublicKey gm_key: an optional Grid Manager
public key. If provided, certificates will be verified against it.
:returns: list containing any known certificates (may be empty)
:raises: ed25519.BadSignature if any certificate signature fails to verify
"""
cert_index = 0
cert_path = config_path.child('{}.cert.{}'.format(name, cert_index))
certificates = []
while cert_path.exists():
container = SignedCertificate.load(cert_path.open('r'))
if gm_key is not None:
validate_grid_manager_certificate(gm_key, container)
cert_data = json.loads(container.certificate)
if cert_data['version'] != 1:
raise ValueError(
"Unknown certificate version '{}' in '{}'".format(
cert_data['version'],
cert_path.path,
)
)
certificates.append(
_GridManagerCertificate(
filename=cert_path.path,
index=cert_index,
expires=datetime.fromisoformat(cert_data['expires']),
public_key=ed25519.verifying_key_from_string(cert_data['public_key'].encode('ascii')),
)
)
cert_index += 1
cert_path = config_path.child('{}.cert.{}'.format(name, cert_index))
return certificates
def load_grid_manager(config_path: Optional[FilePath]):
"""
Load a Grid Manager from existing configuration.
:param FilePath config_path: the configuration location (or None for
stdin)
:returns: a GridManager instance
:raises: ValueError if the confguration is invalid or IOError if
expected files can't be opened.
"""
if config_path is None:
config_file = sys.stdin
else:
# this might raise IOError or similar but caller must handle it
config_file = config_path.child("config.json").open("r")
with config_file:
config = json.load(config_file)
gm_version = config.get(u'grid_manager_config_version', None)
if gm_version != 0:
raise ValueError(
"Missing or unknown version '{}' of Grid Manager config".format(
gm_version
)
)
if 'private_key' not in config:
raise ValueError(
"'private_key' required in config"
)
private_key_bytes = config['private_key'].encode('ascii')
try:
private_key, public_key = ed25519.signing_keypair_from_string(private_key_bytes)
except Exception as e:
raise ValueError(
"Invalid Grid Manager private_key: {}".format(e)
)
storage_servers = dict()
for name, srv_config in list(config.get(u'storage_servers', {}).items()):
if 'public_key' not in srv_config:
raise ValueError(
"No 'public_key' for storage server '{}'".format(name)
)
storage_servers[name] = _GridManagerStorageServer(
name,
ed25519.verifying_key_from_string(srv_config['public_key'].encode('ascii')),
[] if config_path is None else _load_certificates_for(config_path, name, public_key),
)
return _GridManager(private_key_bytes, storage_servers)
class _GridManager(object):
"""
A Grid Manager's configuration.
"""
def __init__(self, private_key_bytes, storage_servers):
self._storage_servers = dictutil.UnicodeKeyDict(
{} if storage_servers is None else storage_servers
)
assert isinstance(private_key_bytes, bytes)
self._private_key_bytes = private_key_bytes
self._private_key, self._public_key = ed25519.signing_keypair_from_string(self._private_key_bytes)
self._version = 0
@property
def storage_servers(self):
return self._storage_servers
def public_identity(self):
"""
:returns: public key as a string
"""
return ed25519.string_from_verifying_key(self._public_key)
def sign(self, name, expiry):
"""
Create a new signed certificate for a particular server
:param str name: the server to create a certificate for
:param timedelta expiry: how far in the future the certificate
should expire.
:returns SignedCertificate: the signed certificate.
"""
assert isinstance(name, str) # must be unicode
try:
srv = self._storage_servers[name]
except KeyError:
raise KeyError(
"No storage server named '{}'".format(name)
)
expiration = current_datetime_with_zone() + expiry
cert_info = {
"expires": expiration.isoformat(),
"public_key": srv.public_key_string(),
"version": 1,
}
cert_data = json.dumps_bytes(cert_info, separators=(',',':'), sort_keys=True)
sig = ed25519.sign_data(self._private_key, cert_data)
certificate = SignedCertificate(
certificate=cert_data,
signature=sig,
)
vk = ed25519.verifying_key_from_signing_key(self._private_key)
ed25519.verify_signature(vk, sig, cert_data)
srv.add_certificate(certificate)
return certificate
def add_storage_server(self, name, public_key):
"""
:param name: a user-meaningful name for the server
:param public_key: ed25519.VerifyingKey the public-key of the
storage provider (e.g. from the contents of node.pubkey
for the client)
"""
assert isinstance(name, str) # must be unicode
if name in self._storage_servers:
raise KeyError(
"Already have a storage server called '{}'".format(name)
)
ss = _GridManagerStorageServer(name, public_key, [])
self._storage_servers[name] = ss
return ss
def remove_storage_server(self, name):
"""
:param name: a user-meaningful name for the server
"""
assert isinstance(name, str) # must be unicode
try:
del self._storage_servers[name]
except KeyError:
raise KeyError(
"No storage server called '{}'".format(name)
)
def marshal(self):
"""
:returns: a dict suitable for JSON representing this object
"""
data = {
u"grid_manager_config_version": self._version,
u"private_key": self._private_key_bytes.decode('ascii'),
}
if self._storage_servers:
data[u"storage_servers"] = {
name: srv.marshal()
for name, srv
in self._storage_servers.items()
}
return data
def save_grid_manager(file_path, grid_manager, create=True):
"""
Writes a Grid Manager configuration.
:param file_path: a FilePath specifying where to write the config
(if None, stdout is used)
:param grid_manager: a _GridManager instance
:param bool create: if True (the default) we are creating a new
grid-manager and will fail if the directory already exists.
"""
data = json.dumps(
grid_manager.marshal(),
indent=4,
)
if file_path is None:
print("{}\n".format(data))
else:
try:
file_path.makedirs()
file_path.chmod(0o700)
except OSError:
if create:
raise
with file_path.child("config.json").open("w") as f:
f.write(data.encode("utf-8"))
f.write(b"\n")
def parse_grid_manager_certificate(gm_data: Union[str, bytes]):
"""
:param gm_data: some data that might be JSON that might be a valid
Grid Manager Certificate
:returns: json data of a valid Grid Manager certificate, or an
exception if the data is not valid.
"""
required_keys = {
'certificate',
'signature',
}
js = json.loads(gm_data)
if not isinstance(js, dict):
raise ValueError(
"Grid Manager certificate must be a dict"
)
if set(js.keys()) != required_keys:
raise ValueError(
"Grid Manager certificate must contain: {}".format(
", ".join("'{}'".format(k) for k in required_keys),
)
)
return js
def validate_grid_manager_certificate(gm_key, alleged_cert):
"""
:param gm_key: a VerifyingKey instance, a Grid Manager's public
key.
:param alleged_cert SignedCertificate: A signed certificate.
:return: a dict consisting of the deserialized certificate data or
None if the signature is invalid. Note we do NOT check the
expiry time in this function.
"""
try:
ed25519.verify_signature(
gm_key,
alleged_cert.signature,
alleged_cert.certificate,
)
except ed25519.BadSignature:
return None
# signature is valid; now we can load the actual data
cert = json.loads(alleged_cert.certificate)
return cert
def create_grid_manager_verifier(keys, certs, public_key, now_fn=None, bad_cert=None):
"""
Creates a predicate for confirming some Grid Manager-issued
certificates against Grid Manager keys. A predicate is used
(instead of just returning True/False here) so that the
expiry-time can be tested on each call.
:param list keys: 0 or more ``VerifyingKey`` instances
:param list certs: 1 or more Grid Manager certificates each of
which is a ``SignedCertificate``.
:param str public_key: the identifier of the server we expect
certificates for.
:param callable now_fn: a callable which returns the current UTC
timestamp (or current_datetime_with_zone() if None).
:param callable bad_cert: a two-argument callable which is invoked
when a certificate verification fails. The first argument is
the verifying key and the second is the certificate. If None
(the default) errors are print()-ed. Note that we may have
several certificates and only one must be valid, so this may
be called (multiple times) even if the function ultimately
returns successfully.
:returns: a callable which will return True only-if there is at
least one valid certificate (that has not at this moment
expired) in `certs` signed by one of the keys in `keys`.
"""
now_fn = current_datetime_with_zone if now_fn is None else now_fn
valid_certs = []
# if we have zero grid-manager keys then everything is valid
if not keys:
return lambda: True
if bad_cert is None:
def bad_cert(key, alleged_cert):
"""
We might want to let the user know about this failed-to-verify
certificate .. but also if you have multiple grid-managers
then a bunch of these messages would appear. Better would
be to bubble this up to some sort of status API (or maybe
on the Welcome page?)
The only thing that might actually be interesting, though,
is whether this whole function returns false or not..
"""
print(
"Grid Manager certificate signature failed. Certificate: "
"\"{cert}\" for key \"{key}\".".format(
cert=alleged_cert,
key=ed25519.string_from_verifying_key(key),
)
)
# validate the signatures on any certificates we have (not yet the expiry dates)
for alleged_cert in certs:
for key in keys:
cert = validate_grid_manager_certificate(key, alleged_cert)
if cert is not None:
valid_certs.append(cert)
else:
bad_cert(key, alleged_cert)
def validate():
"""
:returns: True if *any* certificate is still valid for a server
"""
now = now_fn()
for cert in valid_certs:
expires = datetime.fromisoformat(cert["expires"])
if cert['public_key'].encode("ascii") == public_key:
if expires > now:
# not-expired
return True
return False
return validate

View File

@ -543,7 +543,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
# 0. Start with an ordered list of servers. Maybe *2N* of them.
#
all_servers = storage_broker.get_servers_for_psi(storage_index)
all_servers = storage_broker.get_servers_for_psi(storage_index, for_upload=True)
if not all_servers:
raise NoServersError("client gave us zero servers")

View File

@ -561,6 +561,12 @@ class IServer(IDisplayableServer):
once the connection is lost.
"""
def upload_permitted():
"""
:return: True if we should use this server for uploads, False
otherwise.
"""
def get_storage_server():
"""
Once a server is connected, I return an ``IStorageServer``.
@ -571,8 +577,6 @@ class IServer(IDisplayableServer):
"""
class IMutableSlotWriter(Interface):
"""
The interface for a writer around a mutable slot on a remote server.

View File

@ -35,6 +35,11 @@ from allmydata.mutable.layout import get_version_from_checkstring,\
MDMFSlotWriteProxy, \
SDMFSlotWriteProxy
from eliot import (
Message,
start_action,
)
KiB = 1024
DEFAULT_MUTABLE_MAX_SEGMENT_SIZE = 128 * KiB
PUSHING_BLOCKS_STATE = 0
@ -955,10 +960,29 @@ class Publish(object):
old_assignments.add(server, shnum)
serverlist = []
action = start_action(
action_type=u"mutable:upload:update_goal",
homeless_shares=len(homeless_shares),
)
with action:
for i, server in enumerate(self.full_serverlist):
serverid = server.get_serverid()
if server in self.bad_servers:
Message.log(
message_type=u"mutable:upload:bad-server",
server_id=serverid,
)
continue
# if we have >= 1 grid-managers, this checks that we have
# a valid certificate for this server
if not server.upload_permitted():
Message.log(
message_type=u"mutable:upload:no-gm-certs",
server_id=serverid,
)
continue
entry = (len(old_assignments.get(server, [])), i, serverid, server)
serverlist.append(entry)
serverlist.sort()

View File

@ -14,6 +14,7 @@ if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from six import ensure_str, ensure_text
import json
import datetime
import os.path
import re
@ -350,6 +351,19 @@ class _Config(object):
"Unable to write config file '{}'".format(fn),
)
def enumerate_section(self, section):
"""
returns a dict containing all items in a configuration section. an
empty dict is returned if the section doesn't exist.
"""
answer = dict()
try:
for k in self.config.options(section):
answer[k] = self.config.get(section, k)
except configparser.NoSectionError:
pass
return answer
def items(self, section, default=_None):
try:
return self.config.items(section)
@ -484,6 +498,12 @@ class _Config(object):
"""
returns an absolute path inside the 'private' directory with any
extra args join()-ed
This exists for historical reasons. New code should ideally
not call this because it makes it harder for e.g. a SQL-based
_Config object to exist. Code that needs to call this method
should probably be a _Config method itself. See
e.g. get_grid_manager_certificates()
"""
return os.path.join(self._basedir, "private", *args)
@ -491,6 +511,12 @@ class _Config(object):
"""
returns an absolute path inside the config directory with any
extra args join()-ed
This exists for historical reasons. New code should ideally
not call this because it makes it harder for e.g. a SQL-based
_Config object to exist. Code that needs to call this method
should probably be a _Config method itself. See
e.g. get_grid_manager_certificates()
"""
# note: we re-expand here (_basedir already went through this
# expanduser function) in case the path we're being asked for
@ -499,6 +525,35 @@ class _Config(object):
os.path.join(self._basedir, *args)
)
def get_grid_manager_certificates(self):
"""
Load all Grid Manager certificates in the config.
:returns: A list of all certificates. An empty list is
returned if there are none.
"""
grid_manager_certificates = []
cert_fnames = list(self.enumerate_section("grid_manager_certificates").values())
for fname in cert_fnames:
fname = self.get_config_path(fname)
if not os.path.exists(fname):
raise ValueError(
"Grid Manager certificate file '{}' doesn't exist".format(
fname
)
)
with open(fname, 'r') as f:
cert = json.load(f)
if set(cert.keys()) != {"certificate", "signature"}:
raise ValueError(
"Unknown key in Grid Manager certificate '{}'".format(
fname
)
)
grid_manager_certificates.append(cert)
return grid_manager_certificates
def get_introducer_configuration(self):
"""
Get configuration for introducers.

View File

@ -89,7 +89,7 @@ class _FoolscapOrHttps(Protocol, metaclass=_PretendToBeNegotiation):
certificate=cls.tub.myCertificate.original,
)
http_storage_server = HTTPServer(storage_server, swissnum)
http_storage_server = HTTPServer(reactor, storage_server, swissnum)
cls.https_factory = TLSMemoryBIOFactory(
certificate_options,
False,

View File

@ -12,11 +12,6 @@ if PY2:
from six import ensure_binary
try:
from allmydata.scripts.types_ import SubCommands
except ImportError:
pass
from twisted.python import usage
from twisted.python.filepath import (
FilePath,
@ -29,6 +24,14 @@ from allmydata.storage import (
crawler,
expirer,
)
from allmydata.scripts.types_ import SubCommands
from allmydata.client import read_config
from allmydata.grid_manager import (
parse_grid_manager_certificate,
)
from allmydata.scripts.cli import _default_nodedir
from allmydata.util.encodingutil import argv_to_abspath
from allmydata.util import jsonbytes
class GenerateKeypairOptions(BaseOptions):
@ -75,6 +78,7 @@ def derive_pubkey(options):
print("public:", str(ed25519.string_from_verifying_key(public_key), "ascii"), file=out)
return 0
class MigrateCrawlerOptions(BasedirOptions):
def getSynopsis(self):
@ -94,6 +98,61 @@ class MigrateCrawlerOptions(BasedirOptions):
return t
class AddGridManagerCertOptions(BaseOptions):
"""
Options for add-grid-manager-cert
"""
optParameters = [
['filename', 'f', None, "Filename of the certificate ('-', a dash, for stdin)"],
['name', 'n', None, "Name to give this certificate"],
]
def getSynopsis(self):
return "Usage: tahoe [global-options] admin add-grid-manager-cert [options]"
def postOptions(self) -> None:
if self['name'] is None:
raise usage.UsageError(
"Must provide --name option"
)
if self['filename'] is None:
raise usage.UsageError(
"Must provide --filename option"
)
data: str
if self['filename'] == '-':
print("reading certificate from stdin", file=self.parent.parent.stderr)
data = self.parent.parent.stdin.read()
if len(data) == 0:
raise usage.UsageError(
"Reading certificate from stdin failed"
)
else:
with open(self['filename'], 'r') as f:
data = f.read()
try:
self.certificate_data = parse_grid_manager_certificate(data)
except ValueError as e:
raise usage.UsageError(
"Error parsing certificate: {}".format(e)
)
def getUsage(self, width=None):
t = BaseOptions.getUsage(self, width)
t += (
"Adds a Grid Manager certificate to a Storage Server.\n\n"
"The certificate will be copied into the base-dir and config\n"
"will be added to 'tahoe.cfg', which will be re-written. A\n"
"restart is required for changes to take effect.\n\n"
"The human who operates a Grid Manager would produce such a\n"
"certificate and communicate it securely to you.\n"
)
return t
def migrate_crawler(options):
out = options.stdout
storage = FilePath(options['basedir']).child("storage")
@ -116,6 +175,44 @@ def migrate_crawler(options):
print("Not found: '{}'".format(fp.path), file=out)
def add_grid_manager_cert(options):
"""
Add a new Grid Manager certificate to our config
"""
# XXX is there really not already a function for this?
if options.parent.parent['node-directory']:
nd = argv_to_abspath(options.parent.parent['node-directory'])
else:
nd = _default_nodedir
config = read_config(nd, "portnum")
cert_fname = "{}.cert".format(options['name'])
cert_path = FilePath(config.get_config_path(cert_fname))
cert_bytes = jsonbytes.dumps_bytes(options.certificate_data, indent=4) + b'\n'
cert_name = options['name']
if cert_path.exists():
msg = "Already have certificate for '{}' (at {})".format(
options['name'],
cert_path.path,
)
print(msg, file=options.stderr)
return 1
config.set_config("storage", "grid_management", "True")
config.set_config("grid_manager_certificates", cert_name, cert_fname)
# write all the data out
with cert_path.open("wb") as f:
f.write(cert_bytes)
cert_count = len(config.enumerate_section("grid_manager_certificates"))
print("There are now {} certificates".format(cert_count),
file=options.stderr)
return 0
class AdminCommand(BaseOptions):
subCommands = [
("generate-keypair", None, GenerateKeypairOptions,
@ -124,6 +221,9 @@ class AdminCommand(BaseOptions):
"Derive a public key from a private key."),
("migrate-crawler", None, MigrateCrawlerOptions,
"Write the crawler-history data as JSON."),
("add-grid-manager-cert", None, AddGridManagerCertOptions,
"Add a Grid Manager-provided certificate to a storage "
"server's config."),
]
def postOptions(self):
if not hasattr(self, 'subOptions'):
@ -138,12 +238,15 @@ each subcommand.
"""
return t
subDispatch = {
"generate-keypair": print_keypair,
"derive-pubkey": derive_pubkey,
"migrate-crawler": migrate_crawler,
"add-grid-manager-cert": add_grid_manager_cert,
}
def do_admin(options):
so = options.subOptions
so.stdout = options.stdout

View File

@ -165,6 +165,8 @@ def parse_or_exit(config, argv, stdout, stderr):
:return: ``config``, after using it to parse the argument list.
"""
try:
config.stdout = stdout
config.stderr = stderr
parse_options(argv[1:], config=config)
except usage.error as e:
# `parse_options` may have the side-effect of initializing a
@ -199,6 +201,7 @@ def dispatch(config,
so.stdout = stdout
so.stderr = stderr
so.stdin = stdin
config.stdin = stdin
if command in create_dispatch:
f = create_dispatch[command]

View File

@ -2,8 +2,6 @@
Type definitions used by modules in this package.
"""
# Python 3 only
from typing import List, Tuple, Type, Sequence, Any
from twisted.python.usage import Options

View File

@ -24,6 +24,7 @@ from twisted.internet.interfaces import (
from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.defer import Deferred
from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate
from twisted.internet.interfaces import IReactorFromThreads
from twisted.web.server import Site, Request
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.python.filepath import FilePath
@ -56,6 +57,8 @@ from .common import si_a2b
from .immutable import BucketWriter, ConflictingWriteError
from ..util.hashutil import timing_safe_compare
from ..util.base32 import rfc3548_alphabet
from ..util.deferredutil import async_to_deferred
from ..util.cputhreadpool import defer_to_thread
from allmydata.interfaces import BadWriteEnablerError
@ -486,8 +489,12 @@ class HTTPServer(object):
return str(failure.value).encode("utf-8")
def __init__(
self, storage_server, swissnum
): # type: (StorageServer, bytes) -> None
self,
reactor: IReactorFromThreads,
storage_server: StorageServer,
swissnum: bytes,
):
self._reactor = reactor
self._storage_server = storage_server
self._swissnum = swissnum
# Maps storage index to StorageIndexUploads:
@ -529,7 +536,7 @@ class HTTPServer(object):
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
raise _HTTPError(http.NOT_ACCEPTABLE)
def _read_encoded(
async def _read_encoded(
self, request, schema: Schema, max_size: int = 1024 * 1024
) -> Any:
"""
@ -542,10 +549,11 @@ class HTTPServer(object):
raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE)
# Make sure it's not too large:
request.content.seek(SEEK_END, 0)
if request.content.tell() > max_size:
request.content.seek(0, SEEK_END)
size = request.content.tell()
if size > max_size:
raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE)
request.content.seek(SEEK_SET, 0)
request.content.seek(0, SEEK_SET)
# We don't want to load the whole message into memory, cause it might
# be quite large. The CDDL validator takes a read-only bytes-like
@ -562,12 +570,21 @@ class HTTPServer(object):
message = mmap.mmap(fd, 0, access=mmap.ACCESS_READ)
else:
message = request.content.read()
# Pycddl will release the GIL when validating larger documents, so
# let's take advantage of multiple CPUs:
if size > 10_000:
await defer_to_thread(self._reactor, schema.validate_cbor, message)
else:
schema.validate_cbor(message)
# The CBOR parser will allocate more memory, but at least we can feed
# it the file-like object, so that if it's large it won't be make two
# copies.
request.content.seek(SEEK_SET, 0)
# Typically deserialization to Python will not release the GIL, and
# indeed as of Jan 2023 cbor2 didn't have any code to release the GIL
# in the decode path. As such, running it in a different thread has no benefit.
return cbor2.load(request.content)
##### Generic APIs #####
@ -585,10 +602,11 @@ class HTTPServer(object):
"/storage/v1/immutable/<storage_index:storage_index>",
methods=["POST"],
)
def allocate_buckets(self, request, authorization, storage_index):
@async_to_deferred
async def allocate_buckets(self, request, authorization, storage_index):
"""Allocate buckets."""
upload_secret = authorization[Secrets.UPLOAD]
info = self._read_encoded(request, _SCHEMAS["allocate_buckets"])
info = await self._read_encoded(request, _SCHEMAS["allocate_buckets"])
# We do NOT validate the upload secret for existing bucket uploads.
# Another upload may be happening in parallel, with a different upload
@ -610,7 +628,7 @@ class HTTPServer(object):
storage_index, share_number, upload_secret, bucket
)
return self._send_encoded(
return await self._send_encoded(
request,
{"already-have": set(already_got), "allocated": set(sharenum_to_bucket)},
)
@ -745,7 +763,8 @@ class HTTPServer(object):
"/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"],
)
def advise_corrupt_share_immutable(
@async_to_deferred
async def advise_corrupt_share_immutable(
self, request, authorization, storage_index, share_number
):
"""Indicate that given share is corrupt, with a text reason."""
@ -754,7 +773,7 @@ class HTTPServer(object):
except KeyError:
raise _HTTPError(http.NOT_FOUND)
info = self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
info = await self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
bucket.advise_corrupt_share(info["reason"].encode("utf-8"))
return b""
@ -766,9 +785,10 @@ class HTTPServer(object):
"/storage/v1/mutable/<storage_index:storage_index>/read-test-write",
methods=["POST"],
)
def mutable_read_test_write(self, request, authorization, storage_index):
@async_to_deferred
async def mutable_read_test_write(self, request, authorization, storage_index):
"""Read/test/write combined operation for mutables."""
rtw_request = self._read_encoded(
rtw_request = await self._read_encoded(
request, _SCHEMAS["mutable_read_test_write"], max_size=2**48
)
secrets = (
@ -795,7 +815,9 @@ class HTTPServer(object):
)
except BadWriteEnablerError:
raise _HTTPError(http.UNAUTHORIZED)
return self._send_encoded(request, {"success": success, "data": read_data})
return await self._send_encoded(
request, {"success": success, "data": read_data}
)
@_authorized_route(
_app,
@ -840,7 +862,8 @@ class HTTPServer(object):
"/storage/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
methods=["POST"],
)
def advise_corrupt_share_mutable(
@async_to_deferred
async def advise_corrupt_share_mutable(
self, request, authorization, storage_index, share_number
):
"""Indicate that given share is corrupt, with a text reason."""
@ -849,7 +872,7 @@ class HTTPServer(object):
}:
raise _HTTPError(http.NOT_FOUND)
info = self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
info = await self._read_encoded(request, _SCHEMAS["advise_corrupt_share"])
self._storage_server.advise_corrupt_share(
b"mutable", storage_index, share_number, info["reason"].encode("utf-8")
)

View File

@ -34,8 +34,11 @@ from __future__ import annotations
from six import ensure_text
from typing import Union, Callable, Any, Optional
import re, time, hashlib
from os import urandom
import re
import time
import hashlib
from configparser import NoSectionError
import attr
@ -67,6 +70,12 @@ from allmydata.interfaces import (
IStorageServer,
IFoolscapStoragePlugin,
)
from allmydata.grid_manager import (
create_grid_manager_verifier,
)
from allmydata.crypto import (
ed25519,
)
from allmydata.util import log, base32, connection_status
from allmydata.util.assertutil import precondition
from allmydata.util.observer import ObserverList
@ -112,9 +121,15 @@ class StorageClientConfig(object):
:ivar dict[unicode, dict[unicode, unicode]] storage_plugins: A mapping from
names of ``IFoolscapStoragePlugin`` configured in *tahoe.cfg* to the
respective configuration.
:ivar list[ed25519.VerifyKey] grid_manager_keys: with no keys in
this list, we'll upload to any storage server. Otherwise, we will
only upload to a storage-server that has a valid certificate
signed by at least one of these keys.
"""
preferred_peers = attr.ib(default=())
storage_plugins = attr.ib(default=attr.Factory(dict))
grid_manager_keys = attr.ib(default=attr.Factory(list))
@classmethod
def from_node_config(cls, config):
@ -146,9 +161,17 @@ class StorageClientConfig(object):
plugin_config = []
storage_plugins[plugin_name] = dict(plugin_config)
grid_manager_keys = []
for name, gm_key in config.enumerate_section('grid_managers').items():
grid_manager_keys.append(
ed25519.verifying_key_from_string(gm_key.encode("ascii"))
)
return cls(
preferred_peers,
storage_plugins,
grid_manager_keys,
)
@ -269,10 +292,21 @@ class StorageFarmBroker(service.MultiService):
by the given announcement.
"""
assert isinstance(server_id, bytes)
gm_verifier = create_grid_manager_verifier(
self.storage_client_config.grid_manager_keys,
server["ann"].get("grid-manager-certificates", []),
"pub-{}".format(str(server_id, "ascii")), # server_id is v0-<key> not pub-v0-key .. for reasons?
)
if len(server["ann"].get(ANONYMOUS_STORAGE_NURLS, [])) > 0:
s = HTTPNativeStorageServer(server_id, server["ann"])
s = HTTPNativeStorageServer(
server_id,
server["ann"],
grid_manager_verifier=gm_verifier,
)
s.on_status_changed(lambda _: self._got_connection())
return s
handler_overrides = server.get("connections", {})
s = NativeStorageServer(
server_id,
@ -281,6 +315,7 @@ class StorageFarmBroker(service.MultiService):
handler_overrides,
self.node_config,
self.storage_client_config,
gm_verifier,
)
s.on_status_changed(lambda _: self._got_connection())
return s
@ -429,11 +464,26 @@ class StorageFarmBroker(service.MultiService):
for dsc in list(self.servers.values()):
dsc.try_to_connect()
def get_servers_for_psi(self, peer_selection_index):
def get_servers_for_psi(self, peer_selection_index, for_upload=False):
"""
:param for_upload: used to determine if we should include any
servers that are invalid according to Grid Manager
processing. When for_upload is True and we have any Grid
Manager keys configured, any storage servers with invalid or
missing certificates will be excluded.
"""
# return a list of server objects (IServers)
assert self.permute_peers == True
connected_servers = self.get_connected_servers()
preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers)
if for_upload:
# print("upload processing: {}".format([srv.upload_permitted() for srv in connected_servers]))
connected_servers = [
srv
for srv in connected_servers
if srv.upload_permitted()
]
def _permuted(server):
seed = server.get_permutation_seed()
is_unpreferred = server not in preferred_servers
@ -609,9 +659,10 @@ class _FoolscapStorage(object):
{"permutation-seed-base32": "...",
"nickname": "...",
"grid-manager-certificates": [..],
}
*nickname* is optional.
*nickname* and *grid-manager-certificates* are optional.
The furl will be a Unicode string on Python 3; on Python 2 it will be
either a native (bytes) string or a Unicode string.
@ -741,7 +792,8 @@ class NativeStorageServer(service.MultiService):
"application-version": "unknown: no get_version()",
})
def __init__(self, server_id, ann, tub_maker, handler_overrides, node_config, config=StorageClientConfig()):
def __init__(self, server_id, ann, tub_maker, handler_overrides, node_config, config=None,
grid_manager_verifier=None):
service.MultiService.__init__(self)
assert isinstance(server_id, bytes)
self._server_id = server_id
@ -749,6 +801,11 @@ class NativeStorageServer(service.MultiService):
self._tub_maker = tub_maker
self._handler_overrides = handler_overrides
if config is None:
config = StorageClientConfig()
self._grid_manager_verifier = grid_manager_verifier
self._storage = self._make_storage_system(node_config, config, ann)
self.last_connect_time = None
@ -759,6 +816,21 @@ class NativeStorageServer(service.MultiService):
self._trigger_cb = None
self._on_status_changed = ObserverList()
def upload_permitted(self):
"""
If our client is configured with Grid Manager public-keys, we will
only upload to storage servers that have a currently-valid
certificate signed by at least one of the Grid Managers we
accept.
:return: True if we should use this server for uploads, False
otherwise.
"""
# if we have no Grid Manager keys configured, choice is easy
if self._grid_manager_verifier is None:
return True
return self._grid_manager_verifier()
def _make_storage_system(self, node_config, config, ann):
"""
:param allmydata.node._Config node_config: The node configuration to pass
@ -993,13 +1065,14 @@ class HTTPNativeStorageServer(service.MultiService):
"connected".
"""
def __init__(self, server_id: bytes, announcement, reactor=reactor):
def __init__(self, server_id: bytes, announcement, reactor=reactor, grid_manager_verifier=None):
service.MultiService.__init__(self)
assert isinstance(server_id, bytes)
self._server_id = server_id
self.announcement = announcement
self._on_status_changed = ObserverList()
self._reactor = reactor
self._grid_manager_verifier = grid_manager_verifier
furl = announcement["anonymous-storage-FURL"].encode("utf-8")
(
self._nickname,
@ -1048,6 +1121,21 @@ class HTTPNativeStorageServer(service.MultiService):
"""
return self._on_status_changed.subscribe(status_changed)
def upload_permitted(self):
"""
If our client is configured with Grid Manager public-keys, we will
only upload to storage servers that have a currently-valid
certificate signed by at least one of the Grid Managers we
accept.
:return: True if we should use this server for uploads, False
otherwise.
"""
# if we have no Grid Manager keys configured, choice is easy
if self._grid_manager_verifier is None:
return True
return self._grid_manager_verifier()
# Special methods used by copy.copy() and copy.deepcopy(). When those are
# used in allmydata.immutable.filenode to copy CheckResults during
# repair, we want it to treat the IServer instances as singletons, and

View File

@ -10,26 +10,33 @@ from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from six.moves import StringIO
# We're going to override stdin/stderr, so want to match their behavior on respective Python versions.
from io import StringIO
from twisted.python.usage import (
UsageError,
)
from twisted.python.filepath import (
FilePath,
)
from testtools.matchers import (
Contains,
)
from twisted.python.filepath import (
FilePath,
)
from allmydata.scripts.admin import (
migrate_crawler,
add_grid_manager_cert,
)
from allmydata.scripts.runner import (
Options,
)
from allmydata.util import jsonbytes as json
from ..common import (
SyncTestCase,
)
class AdminMigrateCrawler(SyncTestCase):
"""
Tests related to 'tahoe admin migrate-crawler'
@ -85,3 +92,162 @@ class AdminMigrateCrawler(SyncTestCase):
str(options),
Contains("security issues with pickle")
)
fake_cert = {
"certificate": "{\"expires\":1601687822,\"public_key\":\"pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\",\"version\":1}",
"signature": "fvjd3uvvupf2v6tnvkwjd473u3m3inyqkwiclhp7balmchkmn3px5pei3qyfjnhymq4cjcwvbpqmcwwnwswdtrfkpnlaxuih2zbdmda"
}
class AddCertificateOptions(SyncTestCase):
"""
Tests for 'tahoe admin add-grid-manager-cert' option validation
"""
def setUp(self):
self.tahoe = Options()
return super(AddCertificateOptions, self).setUp()
def test_parse_no_data(self):
"""
When no data is passed to stdin an error is produced
"""
self.tahoe.stdin = StringIO("")
self.tahoe.stderr = StringIO() # suppress message
with self.assertRaises(UsageError) as ctx:
self.tahoe.parseOptions(
[
"admin", "add-grid-manager-cert",
"--name", "random-name",
"--filename", "-",
]
)
self.assertIn(
"Reading certificate from stdin failed",
str(ctx.exception)
)
def test_read_cert_file(self):
"""
A certificate can be read from a file
"""
tmp = self.mktemp()
with open(tmp, "wb") as f:
f.write(json.dumps_bytes(fake_cert))
# certificate should be loaded
self.tahoe.parseOptions(
[
"admin", "add-grid-manager-cert",
"--name", "random-name",
"--filename", tmp,
]
)
opts = self.tahoe.subOptions.subOptions
self.assertEqual(
fake_cert,
opts.certificate_data
)
def test_bad_certificate(self):
"""
Unparseable data produces an error
"""
self.tahoe.stdin = StringIO("{}")
self.tahoe.stderr = StringIO() # suppress message
with self.assertRaises(UsageError) as ctx:
self.tahoe.parseOptions(
[
"admin", "add-grid-manager-cert",
"--name", "random-name",
"--filename", "-",
]
)
self.assertIn(
"Grid Manager certificate must contain",
str(ctx.exception)
)
class AddCertificateCommand(SyncTestCase):
"""
Tests for 'tahoe admin add-grid-manager-cert' operation
"""
def setUp(self):
self.tahoe = Options()
self.node_path = FilePath(self.mktemp())
self.node_path.makedirs()
with self.node_path.child("tahoe.cfg").open("w") as f:
f.write(b"# minimal test config\n")
return super(AddCertificateCommand, self).setUp()
def test_add_one(self):
"""
Adding a certificate succeeds
"""
self.tahoe.stdin = StringIO(json.dumps(fake_cert))
self.tahoe.stderr = StringIO()
self.tahoe.parseOptions(
[
"--node-directory", self.node_path.path,
"admin", "add-grid-manager-cert",
"--name", "zero",
"--filename", "-",
]
)
self.tahoe.subOptions.subOptions.stdin = self.tahoe.stdin
self.tahoe.subOptions.subOptions.stderr = self.tahoe.stderr
rc = add_grid_manager_cert(self.tahoe.subOptions.subOptions)
self.assertEqual(rc, 0)
self.assertEqual(
{"zero.cert", "tahoe.cfg"},
set(self.node_path.listdir())
)
self.assertIn(
"There are now 1 certificates",
self.tahoe.stderr.getvalue()
)
def test_add_two(self):
"""
An error message is produced when adding a certificate with a
duplicate name.
"""
self.tahoe.stdin = StringIO(json.dumps(fake_cert))
self.tahoe.stderr = StringIO()
self.tahoe.parseOptions(
[
"--node-directory", self.node_path.path,
"admin", "add-grid-manager-cert",
"--name", "zero",
"--filename", "-",
]
)
self.tahoe.subOptions.subOptions.stdin = self.tahoe.stdin
self.tahoe.subOptions.subOptions.stderr = self.tahoe.stderr
rc = add_grid_manager_cert(self.tahoe.subOptions.subOptions)
self.assertEqual(rc, 0)
self.tahoe.stdin = StringIO(json.dumps(fake_cert))
self.tahoe.parseOptions(
[
"--node-directory", self.node_path.path,
"admin", "add-grid-manager-cert",
"--name", "zero",
"--filename", "-",
]
)
self.tahoe.subOptions.subOptions.stdin = self.tahoe.stdin
self.tahoe.subOptions.subOptions.stderr = self.tahoe.stderr
rc = add_grid_manager_cert(self.tahoe.subOptions.subOptions)
self.assertEqual(rc, 1)
self.assertIn(
"Already have certificate for 'zero'",
self.tahoe.stderr.getvalue()
)

View File

@ -0,0 +1,314 @@
"""
Tests for the grid manager CLI.
"""
import os
from io import (
BytesIO,
)
from unittest import (
skipIf,
)
from twisted.trial.unittest import (
TestCase,
)
from allmydata.cli.grid_manager import (
grid_manager,
)
import click.testing
# these imports support the tests for `tahoe *` subcommands
from ..common_util import (
run_cli,
)
from twisted.internet.defer import (
inlineCallbacks,
)
from twisted.python.filepath import (
FilePath,
)
from twisted.python.runtime import (
platform,
)
from allmydata.util import jsonbytes as json
class GridManagerCommandLine(TestCase):
"""
Test the mechanics of the `grid-manager` command
"""
def setUp(self):
self.runner = click.testing.CliRunner()
super(GridManagerCommandLine, self).setUp()
def invoke_and_check(self, *args, **kwargs):
"""Invoke a command with the runner and ensure it succeeded."""
result = self.runner.invoke(*args, **kwargs)
if result.exception is not None:
raise result.exc_info[1].with_traceback(result.exc_info[2])
self.assertEqual(result.exit_code, 0, result)
return result
def test_create(self):
"""
Create a new grid-manager
"""
with self.runner.isolated_filesystem():
result = self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
self.assertEqual(["foo"], os.listdir("."))
self.assertEqual(["config.json"], os.listdir("./foo"))
result = self.invoke_and_check(grid_manager, ["--config", "foo", "public-identity"])
self.assertTrue(result.output.startswith("pub-v0-"))
def test_load_invalid(self):
"""
An invalid config is reported to the user
"""
with self.runner.isolated_filesystem():
with open("config.json", "wb") as f:
f.write(json.dumps_bytes({"not": "valid"}))
result = self.runner.invoke(grid_manager, ["--config", ".", "public-identity"])
self.assertNotEqual(result.exit_code, 0)
self.assertIn(
"Error loading Grid Manager",
result.output,
)
def test_create_already(self):
"""
It's an error to create a new grid-manager in an existing
directory.
"""
with self.runner.isolated_filesystem():
result = self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
result = self.runner.invoke(grid_manager, ["--config", "foo", "create"])
self.assertEqual(1, result.exit_code)
self.assertIn(
"Can't create",
result.stdout,
)
def test_create_stdout(self):
"""
Create a new grid-manager with no files
"""
with self.runner.isolated_filesystem():
result = self.invoke_and_check(grid_manager, ["--config", "-", "create"])
self.assertEqual([], os.listdir("."))
config = json.loads(result.output)
self.assertEqual(
{"private_key", "grid_manager_config_version"},
set(config.keys()),
)
def test_list_stdout(self):
"""
Load Grid Manager without files (using 'list' subcommand, but any will do)
"""
config = {
"storage_servers": {
"storage0": {
"public_key": "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
}
},
"private_key": "priv-v0-6uinzyaxy3zvscwgsps5pxcfezhrkfb43kvnrbrhhfzyduyqnniq",
"grid_manager_config_version": 0
}
result = self.invoke_and_check(
grid_manager, ["--config", "-", "list"],
input=BytesIO(json.dumps_bytes(config)),
)
self.assertEqual(result.exit_code, 0)
self.assertEqual(
"storage0: pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\n",
result.output,
)
def test_add_and_sign(self):
"""
Add a new storage-server and sign a certificate for it
"""
pubkey = "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
with self.runner.isolated_filesystem():
self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
self.invoke_and_check(grid_manager, ["--config", "foo", "add", "storage0", pubkey])
result = self.invoke_and_check(grid_manager, ["--config", "foo", "sign", "storage0", "10"])
sigcert = json.loads(result.output)
self.assertEqual({"certificate", "signature"}, set(sigcert.keys()))
cert = json.loads(sigcert['certificate'])
self.assertEqual(cert["public_key"], pubkey)
def test_add_and_sign_second_cert(self):
"""
Add a new storage-server and sign two certificates.
"""
pubkey = "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
with self.runner.isolated_filesystem():
self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
self.invoke_and_check(grid_manager, ["--config", "foo", "add", "storage0", pubkey])
self.invoke_and_check(grid_manager, ["--config", "foo", "sign", "storage0", "10"])
self.invoke_and_check(grid_manager, ["--config", "foo", "sign", "storage0", "10"])
# we should now have two certificates stored
self.assertEqual(
set(FilePath("foo").listdir()),
{'storage0.cert.1', 'storage0.cert.0', 'config.json'},
)
def test_add_twice(self):
"""
An error is reported trying to add an existing server
"""
pubkey0 = "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
pubkey1 = "pub-v0-5ysc55trfvfvg466v46j4zmfyltgus3y2gdejifctv7h4zkuyveq"
with self.runner.isolated_filesystem():
self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
self.invoke_and_check(grid_manager, ["--config", "foo", "add", "storage0", pubkey0])
result = self.runner.invoke(grid_manager, ["--config", "foo", "add", "storage0", pubkey1])
self.assertNotEquals(result.exit_code, 0)
self.assertIn(
"A storage-server called 'storage0' already exists",
result.output,
)
def test_add_list_remove(self):
"""
Add a storage server, list it, remove it.
"""
pubkey = "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
with self.runner.isolated_filesystem():
self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
self.invoke_and_check(grid_manager, ["--config", "foo", "add", "storage0", pubkey])
self.invoke_and_check(grid_manager, ["--config", "foo", "sign", "storage0", "1"])
result = self.invoke_and_check(grid_manager, ["--config", "foo", "list"])
names = [
line.split(':')[0]
for line in result.output.strip().split('\n')
if not line.startswith(" ") # "cert" lines start with whitespace
]
self.assertEqual(names, ["storage0"])
self.invoke_and_check(grid_manager, ["--config", "foo", "remove", "storage0"])
result = self.invoke_and_check(grid_manager, ["--config", "foo", "list"])
self.assertEqual(result.output.strip(), "")
def test_remove_missing(self):
"""
Error reported when removing non-existant server
"""
with self.runner.isolated_filesystem():
self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
result = self.runner.invoke(grid_manager, ["--config", "foo", "remove", "storage0"])
self.assertNotEquals(result.exit_code, 0)
self.assertIn(
"No storage-server called 'storage0' exists",
result.output,
)
def test_sign_missing(self):
"""
Error reported when signing non-existant server
"""
with self.runner.isolated_filesystem():
self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
result = self.runner.invoke(grid_manager, ["--config", "foo", "sign", "storage0", "42"])
self.assertNotEquals(result.exit_code, 0)
self.assertIn(
"No storage-server called 'storage0' exists",
result.output,
)
@skipIf(not platform.isLinux(), "I only know how permissions work on linux")
def test_sign_bad_perms(self):
"""
Error reported if we can't create certificate file
"""
pubkey = "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
with self.runner.isolated_filesystem():
self.invoke_and_check(grid_manager, ["--config", "foo", "create"])
self.invoke_and_check(grid_manager, ["--config", "foo", "add", "storage0", pubkey])
# make the directory un-writable (so we can't create a new cert)
os.chmod("foo", 0o550)
result = self.runner.invoke(grid_manager, ["--config", "foo", "sign", "storage0", "42"])
self.assertEquals(result.exit_code, 1)
self.assertIn(
"Permission denied",
result.output,
)
class TahoeAddGridManagerCert(TestCase):
"""
Test `tahoe admin add-grid-manager-cert` subcommand
"""
@inlineCallbacks
def test_help(self):
"""
some kind of help is printed
"""
code, out, err = yield run_cli("admin", "add-grid-manager-cert")
self.assertEqual(err, "")
self.assertNotEqual(0, code)
@inlineCallbacks
def test_no_name(self):
"""
error to miss --name option
"""
code, out, err = yield run_cli(
"admin", "add-grid-manager-cert", "--filename", "-",
stdin=b"the cert",
)
self.assertIn(
"Must provide --name",
out
)
@inlineCallbacks
def test_no_filename(self):
"""
error to miss --name option
"""
code, out, err = yield run_cli(
"admin", "add-grid-manager-cert", "--name", "foo",
stdin=b"the cert",
)
self.assertIn(
"Must provide --filename",
out
)
@inlineCallbacks
def test_add_one(self):
"""
we can add a certificate
"""
nodedir = self.mktemp()
fake_cert = b"""{"certificate": "", "signature": ""}"""
code, out, err = yield run_cli(
"--node-directory", nodedir,
"admin", "add-grid-manager-cert", "-f", "-", "--name", "foo",
stdin=fake_cert,
ignore_stderr=True,
)
nodepath = FilePath(nodedir)
with nodepath.child("tahoe.cfg").open("r") as f:
config_data = f.read()
self.assertIn("tahoe.cfg", nodepath.listdir())
self.assertIn(
b"foo = foo.cert",
config_data,
)
self.assertIn("foo.cert", nodepath.listdir())
with nodepath.child("foo.cert").open("r") as f:
self.assertEqual(
json.load(f),
json.loads(fake_cert)
)

View File

@ -681,6 +681,9 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# test code.
FORCE_FOOLSCAP_FOR_STORAGE : Optional[bool] = None
# If True, reduce the timeout on connections:
REDUCE_HTTP_CLIENT_TIMEOUT : bool = True
def setUp(self):
self._http_client_pools = []
http_client.StorageClient.start_test_mode(self._got_new_http_connection_pool)
@ -707,6 +710,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
d.addTimeout(1, reactor)
return d
if self.REDUCE_HTTP_CLIENT_TIMEOUT:
pool.getConnection = getConnectionWithTimeout
def close_idle_http_connections(self):

View File

@ -134,6 +134,7 @@ def run_cli_native(verb, *args, **kwargs):
stdin = TextIOWrapper(BytesIO(stdin), encoding)
stdout = TextIOWrapper(BytesIO(), encoding)
stderr = TextIOWrapper(BytesIO(), encoding)
options.stdin = stdin
d = defer.succeed(argv)
d.addCallback(
partial(

View File

@ -195,6 +195,10 @@ class NoNetworkServer(object):
return self
def __deepcopy__(self, memodict):
return self
def upload_permitted(self):
return True
def get_serverid(self):
return self.serverid
def get_permutation_seed(self):
@ -225,7 +229,7 @@ class NoNetworkServer(object):
@implementer(IStorageBroker)
class NoNetworkStorageBroker(object): # type: ignore # missing many methods
def get_servers_for_psi(self, peer_selection_index):
def get_servers_for_psi(self, peer_selection_index, for_upload=True):
def _permuted(server):
seed = server.get_permutation_seed()
return permute_server_hash(peer_selection_index, seed)

View File

@ -26,6 +26,11 @@ from ..uri import (
MDMFDirectoryURI,
)
from allmydata.util.base32 import (
b2a,
)
def write_capabilities():
"""
Build ``IURI`` providers representing all kinds of write capabilities.
@ -121,6 +126,7 @@ def dir2_mdmf_capabilities():
mdmf_capabilities(),
)
def offsets(min_value=0, max_value=2 ** 16):
"""
Build ``int`` values that could be used as valid offsets into a sequence
@ -134,3 +140,13 @@ def lengths(min_value=1, max_value=2 ** 16):
share data in a share file).
"""
return integers(min_value, max_value)
def base32text():
"""
Build text()s that are valid base32
"""
return builds(
lambda b: str(b2a(b), "ascii"),
binary(),
)

View File

@ -10,7 +10,8 @@ from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import os, sys
import os
import sys
from functools import (
partial,
)
@ -46,6 +47,7 @@ from testtools.matchers import (
AfterPreprocessing,
MatchesListwise,
MatchesDict,
ContainsDict,
Always,
Is,
raises,
@ -72,6 +74,7 @@ from allmydata.util import (
fileutil,
encodingutil,
configutil,
jsonbytes as json,
)
from allmydata.util.eliotutil import capture_logging
from allmydata.util.fileutil import abspath_expanduser_unicode
@ -1508,3 +1511,45 @@ enabled = {storage_enabled}
),
),
)
def test_announcement_includes_grid_manager(self):
"""
When Grid Manager is enabled certificates are included in the
announcement
"""
fake_cert = {
"certificate": "{\"expires\":1601687822,\"public_key\":\"pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\",\"version\":1}",
"signature": "fvjd3uvvupf2v6tnvkwjd473u3m3inyqkwiclhp7balmchkmn3px5pei3qyfjnhymq4cjcwvbpqmcwwnwswdtrfkpnlaxuih2zbdmda",
}
with self.basedir.child("zero.cert").open("w") as f:
f.write(json.dumps_bytes(fake_cert))
with self.basedir.child("gm0.cert").open("w") as f:
f.write(json.dumps_bytes(fake_cert))
config = client.config_from_string(
self.basedir.path,
"tub.port",
self.get_config(
storage_enabled=True,
more_storage="grid_management = True",
more_sections=(
"[grid_managers]\n"
"gm0 = pub-v0-ibpbsexcjfbv3ni7gwlclgn6mldaqnqd5mrtan2fnq2b27xnovca\n"
"[grid_manager_certificates]\n"
"foo = zero.cert\n"
)
),
)
self.assertThat(
client.create_client_from_config(
config,
_introducer_factory=MemoryIntroducerClient,
),
succeeded(AfterPreprocessing(
lambda client: get_published_announcements(client)[0].ann,
ContainsDict({
"grid-manager-certificates": Equals([fake_cert]),
}),
)),
)

View File

@ -0,0 +1,455 @@
"""
Tests for the grid manager.
"""
from datetime import (
timedelta,
)
from twisted.python.filepath import (
FilePath,
)
from hypothesis import given
from allmydata.node import (
config_from_string,
)
from allmydata.client import (
_valid_config as client_valid_config,
)
from allmydata.crypto import (
ed25519,
)
from allmydata.util import (
jsonbytes as json,
)
from allmydata.grid_manager import (
load_grid_manager,
save_grid_manager,
create_grid_manager,
parse_grid_manager_certificate,
create_grid_manager_verifier,
SignedCertificate,
)
from allmydata.test.strategies import (
base32text,
)
from .common import SyncTestCase
class GridManagerUtilities(SyncTestCase):
"""
Confirm operation of utility functions used by GridManager
"""
def test_load_certificates(self):
"""
Grid Manager certificates are deserialized from config properly
"""
cert_path = self.mktemp()
fake_cert = {
"certificate": "{\"expires\":1601687822,\"public_key\":\"pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\",\"version\":1}",
"signature": "fvjd3uvvupf2v6tnvkwjd473u3m3inyqkwiclhp7balmchkmn3px5pei3qyfjnhymq4cjcwvbpqmcwwnwswdtrfkpnlaxuih2zbdmda"
}
with open(cert_path, "wb") as f:
f.write(json.dumps_bytes(fake_cert))
config_data = (
"[grid_managers]\n"
"fluffy = pub-v0-vqimc4s5eflwajttsofisp5st566dbq36xnpp4siz57ufdavpvlq\n"
"[grid_manager_certificates]\n"
"ding = {}\n".format(cert_path)
)
config = config_from_string("/foo", "portnum", config_data, client_valid_config())
self.assertEqual(
{"fluffy": "pub-v0-vqimc4s5eflwajttsofisp5st566dbq36xnpp4siz57ufdavpvlq"},
config.enumerate_section("grid_managers")
)
certs = config.get_grid_manager_certificates()
self.assertEqual([fake_cert], certs)
def test_load_certificates_invalid_version(self):
"""
An error is reported loading invalid certificate version
"""
gm_path = FilePath(self.mktemp())
gm_path.makedirs()
config = {
"grid_manager_config_version": 0,
"private_key": "priv-v0-ub7knkkmkptqbsax4tznymwzc4nk5lynskwjsiubmnhcpd7lvlqa",
"storage_servers": {
"radia": {
"public_key": "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
}
}
}
with gm_path.child("config.json").open("wb") as f:
f.write(json.dumps_bytes(config))
fake_cert = {
"certificate": "{\"expires\":1601687822,\"public_key\":\"pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\",\"version\":22}",
"signature": "fvjd3uvvupf2v6tnvkwjd473u3m3inyqkwiclhp7balmchkmn3px5pei3qyfjnhymq4cjcwvbpqmcwwnwswdtrfkpnlaxuih2zbdmda"
}
with gm_path.child("radia.cert.0").open("wb") as f:
f.write(json.dumps_bytes(fake_cert))
with self.assertRaises(ValueError) as ctx:
load_grid_manager(gm_path)
self.assertIn(
"22",
str(ctx.exception),
)
def test_load_certificates_unknown_key(self):
"""
An error is reported loading certificates with invalid keys in them
"""
cert_path = self.mktemp()
fake_cert = {
"certificate": "{\"expires\":1601687822,\"public_key\":\"pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\",\"version\":22}",
"signature": "fvjd3uvvupf2v6tnvkwjd473u3m3inyqkwiclhp7balmchkmn3px5pei3qyfjnhymq4cjcwvbpqmcwwnwswdtrfkpnlaxuih2zbdmda",
"something-else": "not valid in a v0 certificate"
}
with open(cert_path, "wb") as f:
f.write(json.dumps_bytes(fake_cert))
config_data = (
"[grid_manager_certificates]\n"
"ding = {}\n".format(cert_path)
)
config = config_from_string("/foo", "portnum", config_data, client_valid_config())
with self.assertRaises(ValueError) as ctx:
config.get_grid_manager_certificates()
self.assertIn(
"Unknown key in Grid Manager certificate",
str(ctx.exception)
)
def test_load_certificates_missing(self):
"""
An error is reported for missing certificates
"""
cert_path = self.mktemp()
config_data = (
"[grid_managers]\n"
"fluffy = pub-v0-vqimc4s5eflwajttsofisp5st566dbq36xnpp4siz57ufdavpvlq\n"
"[grid_manager_certificates]\n"
"ding = {}\n".format(cert_path)
)
config = config_from_string("/foo", "portnum", config_data, client_valid_config())
with self.assertRaises(ValueError) as ctx:
config.get_grid_manager_certificates()
# we don't reliably know how Windows or MacOS will represent
# the path in the exception, so we don't check for the *exact*
# message with full-path here..
self.assertIn(
"Grid Manager certificate file",
str(ctx.exception)
)
self.assertIn(
" doesn't exist",
str(ctx.exception)
)
class GridManagerVerifier(SyncTestCase):
"""
Tests related to rejecting or accepting Grid Manager certificates.
"""
def setUp(self):
self.gm = create_grid_manager()
return super(GridManagerVerifier, self).setUp()
def test_sign_cert(self):
"""
For a storage server previously added to a grid manager,
_GridManager.sign returns a dict with "certificate" and
"signature" properties where the value of "signature" gives
the ed25519 signature (using the grid manager's private key of
the value) of "certificate".
"""
priv, pub = ed25519.create_signing_keypair()
self.gm.add_storage_server("test", pub)
cert0 = self.gm.sign("test", timedelta(seconds=86400))
cert1 = self.gm.sign("test", timedelta(seconds=3600))
self.assertNotEqual(cert0, cert1)
self.assertIsInstance(cert0, SignedCertificate)
gm_key = ed25519.verifying_key_from_string(self.gm.public_identity())
self.assertEqual(
ed25519.verify_signature(
gm_key,
cert0.signature,
cert0.certificate,
),
None
)
def test_sign_cert_wrong_name(self):
"""
Try to sign a storage-server that doesn't exist
"""
with self.assertRaises(KeyError):
self.gm.sign("doesn't exist", timedelta(seconds=86400))
def test_add_cert(self):
"""
Add a storage-server and serialize it
"""
priv, pub = ed25519.create_signing_keypair()
self.gm.add_storage_server("test", pub)
data = self.gm.marshal()
self.assertEqual(
data["storage_servers"],
{
"test": {
"public_key": ed25519.string_from_verifying_key(pub),
}
}
)
def test_remove(self):
"""
Add then remove a storage-server
"""
priv, pub = ed25519.create_signing_keypair()
self.gm.add_storage_server("test", pub)
self.gm.remove_storage_server("test")
self.assertEqual(len(self.gm.storage_servers), 0)
def test_serialize(self):
"""
Write and then read a Grid Manager config
"""
priv0, pub0 = ed25519.create_signing_keypair()
priv1, pub1 = ed25519.create_signing_keypair()
self.gm.add_storage_server("test0", pub0)
self.gm.add_storage_server("test1", pub1)
tempdir = self.mktemp()
fp = FilePath(tempdir)
save_grid_manager(fp, self.gm)
gm2 = load_grid_manager(fp)
self.assertEqual(
self.gm.public_identity(),
gm2.public_identity(),
)
self.assertEqual(
len(self.gm.storage_servers),
len(gm2.storage_servers),
)
for name, ss0 in list(self.gm.storage_servers.items()):
ss1 = gm2.storage_servers[name]
self.assertEqual(ss0.name, ss1.name)
self.assertEqual(ss0.public_key_string(), ss1.public_key_string())
self.assertEqual(self.gm.marshal(), gm2.marshal())
def test_invalid_no_version(self):
"""
Invalid Grid Manager config with no version
"""
tempdir = self.mktemp()
fp = FilePath(tempdir)
bad_config = {
"private_key": "at least we have one",
}
fp.makedirs()
with fp.child("config.json").open("w") as f:
f.write(json.dumps_bytes(bad_config))
with self.assertRaises(ValueError) as ctx:
load_grid_manager(fp)
self.assertIn(
"unknown version",
str(ctx.exception),
)
def test_invalid_certificate_bad_version(self):
"""
Invalid Grid Manager config containing a certificate with an
illegal version
"""
tempdir = self.mktemp()
fp = FilePath(tempdir)
config = {
"grid_manager_config_version": 0,
"private_key": "priv-v0-ub7knkkmkptqbsax4tznymwzc4nk5lynskwjsiubmnhcpd7lvlqa",
"storage_servers": {
"alice": {
"public_key": "pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga"
}
}
}
bad_cert = {
"certificate": "{\"expires\":1601687822,\"public_key\":\"pub-v0-cbq6hcf3pxcz6ouoafrbktmkixkeuywpcpbcomzd3lqbkq4nmfga\",\"version\":0}",
"signature": "fvjd3uvvupf2v6tnvkwjd473u3m3inyqkwiclhp7balmchkmn3px5pei3qyfjnhymq4cjcwvbpqmcwwnwswdtrfkpnlaxuih2zbdmda"
}
fp.makedirs()
with fp.child("config.json").open("w") as f:
f.write(json.dumps_bytes(config))
with fp.child("alice.cert.0").open("w") as f:
f.write(json.dumps_bytes(bad_cert))
with self.assertRaises(ValueError) as ctx:
load_grid_manager(fp)
self.assertIn(
"Unknown certificate version",
str(ctx.exception),
)
def test_invalid_no_private_key(self):
"""
Invalid Grid Manager config with no private key
"""
tempdir = self.mktemp()
fp = FilePath(tempdir)
bad_config = {
"grid_manager_config_version": 0,
}
fp.makedirs()
with fp.child("config.json").open("w") as f:
f.write(json.dumps_bytes(bad_config))
with self.assertRaises(ValueError) as ctx:
load_grid_manager(fp)
self.assertIn(
"'private_key' required",
str(ctx.exception),
)
def test_invalid_bad_private_key(self):
"""
Invalid Grid Manager config with bad private-key
"""
tempdir = self.mktemp()
fp = FilePath(tempdir)
bad_config = {
"grid_manager_config_version": 0,
"private_key": "not actually encoded key",
}
fp.makedirs()
with fp.child("config.json").open("w") as f:
f.write(json.dumps_bytes(bad_config))
with self.assertRaises(ValueError) as ctx:
load_grid_manager(fp)
self.assertIn(
"Invalid Grid Manager private_key",
str(ctx.exception),
)
def test_invalid_storage_server(self):
"""
Invalid Grid Manager config with missing public-key for
storage-server
"""
tempdir = self.mktemp()
fp = FilePath(tempdir)
bad_config = {
"grid_manager_config_version": 0,
"private_key": "priv-v0-ub7knkkmkptqbsax4tznymwzc4nk5lynskwjsiubmnhcpd7lvlqa",
"storage_servers": {
"bad": {}
}
}
fp.makedirs()
with fp.child("config.json").open("w") as f:
f.write(json.dumps_bytes(bad_config))
with self.assertRaises(ValueError) as ctx:
load_grid_manager(fp)
self.assertIn(
"No 'public_key' for storage server",
str(ctx.exception),
)
def test_parse_cert(self):
"""
Parse an ostensibly valid storage certificate
"""
js = parse_grid_manager_certificate('{"certificate": "", "signature": ""}')
self.assertEqual(
set(js.keys()),
{"certificate", "signature"}
)
# the signature isn't *valid*, but that's checked in a
# different function
def test_parse_cert_not_dict(self):
"""
Certificate data not even a dict
"""
with self.assertRaises(ValueError) as ctx:
parse_grid_manager_certificate("[]")
self.assertIn(
"must be a dict",
str(ctx.exception),
)
def test_parse_cert_missing_signature(self):
"""
Missing the signature
"""
with self.assertRaises(ValueError) as ctx:
parse_grid_manager_certificate('{"certificate": ""}')
self.assertIn(
"must contain",
str(ctx.exception),
)
def test_validate_cert(self):
"""
Validate a correctly-signed certificate
"""
priv0, pub0 = ed25519.create_signing_keypair()
self.gm.add_storage_server("test0", pub0)
cert0 = self.gm.sign("test0", timedelta(seconds=86400))
verify = create_grid_manager_verifier(
[self.gm._public_key],
[cert0],
ed25519.string_from_verifying_key(pub0),
)
self.assertTrue(verify())
class GridManagerInvalidVerifier(SyncTestCase):
"""
Invalid certificate rejection tests
"""
def setUp(self):
self.gm = create_grid_manager()
self.priv0, self.pub0 = ed25519.create_signing_keypair()
self.gm.add_storage_server("test0", self.pub0)
self.cert0 = self.gm.sign("test0", timedelta(seconds=86400))
return super(GridManagerInvalidVerifier, self).setUp()
@given(
base32text(),
)
def test_validate_cert_invalid(self, invalid_signature):
"""
An incorrect signature is rejected
"""
# make signature invalid
invalid_cert = SignedCertificate(
self.cert0.certificate,
invalid_signature.encode("ascii"),
)
verify = create_grid_manager_verifier(
[self.gm._public_key],
[invalid_cert],
ed25519.string_from_verifying_key(self.pub0),
bad_cert = lambda key, cert: None,
)
self.assertFalse(verify())

View File

@ -261,6 +261,20 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
with self.assertRaises(MissingConfigEntry):
config.get_config("node", "log_gatherer.furl")
def test_missing_config_section(self):
"""
Enumerating a missing section returns empty dict
"""
basedir = self.mktemp()
fileutil.make_dirs(basedir)
with open(os.path.join(basedir, 'tahoe.cfg'), 'w'):
pass
config = read_config(basedir, "")
self.assertEquals(
config.enumerate_section("not-a-section"),
{}
)
def test_config_required(self):
"""
Asking for missing (but required) configuration is an error

View File

@ -18,10 +18,12 @@ sadly, an internal implementation detail of Twisted being leaked to tests...
For definitely synchronous calls, you can just use ``result_of()``.
"""
import time
from base64 import b64encode
from contextlib import contextmanager
from os import urandom
from typing import Union, Callable, Tuple, Iterable
from queue import Queue
from cbor2 import dumps
from pycddl import ValidationError as CDDLValidationError
from hypothesis import assume, given, strategies as st
@ -31,13 +33,14 @@ from klein import Klein
from hyperlink import DecodedURL
from collections_extended import RangeMap
from twisted.internet.task import Clock, Cooperator
from twisted.internet.interfaces import IReactorTime
from twisted.internet.interfaces import IReactorTime, IReactorFromThreads
from twisted.internet.defer import CancelledError, Deferred
from twisted.web import http
from twisted.web.http_headers import Headers
from werkzeug import routing
from werkzeug.exceptions import NotFound as WNotFound
from testtools.matchers import Equals
from zope.interface import implementer
from .common import SyncTestCase
from ..storage.http_common import get_content_type, CBOR_MIME_TYPE
@ -449,6 +452,27 @@ class CustomHTTPServerTests(SyncTestCase):
self.assertEqual(len(self._http_server.clock.getDelayedCalls()), 0)
@implementer(IReactorFromThreads)
class Reactor(Clock):
"""
Fake reactor that supports time APIs and callFromThread.
Advancing the clock also runs any callbacks scheduled via callFromThread.
"""
def __init__(self):
Clock.__init__(self)
self._queue = Queue()
def callFromThread(self, f, *args, **kwargs):
self._queue.put((f, args, kwargs))
def advance(self, *args, **kwargs):
Clock.advance(self, *args, **kwargs)
while not self._queue.empty():
f, args, kwargs = self._queue.get()
f(*args, **kwargs)
class HttpTestFixture(Fixture):
"""
Setup HTTP tests' infrastructure, the storage server and corresponding
@ -460,7 +484,7 @@ class HttpTestFixture(Fixture):
lambda pool: self.addCleanup(pool.closeCachedConnections)
)
self.addCleanup(StorageClient.stop_test_mode)
self.clock = Clock()
self.clock = Reactor()
self.tempdir = self.useFixture(TempDir())
# The global Cooperator used by Twisted (a) used by pull producers in
# twisted.web, (b) is driven by a real reactor. We want to push time
@ -475,7 +499,7 @@ class HttpTestFixture(Fixture):
self.storage_server = StorageServer(
self.tempdir.path, b"\x00" * 20, clock=self.clock
)
self.http_server = HTTPServer(self.storage_server, SWISSNUM_FOR_TEST)
self.http_server = HTTPServer(self.clock, self.storage_server, SWISSNUM_FOR_TEST)
self.treq = StubTreq(self.http_server.get_resource())
self.client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
@ -501,13 +525,25 @@ class HttpTestFixture(Fixture):
# OK, no result yet, probably async HTTP endpoint handler, so advance
# time, flush treq, and try again:
for i in range(100):
for i in range(10_000):
self.clock.advance(0.001)
self.treq.flush()
if result:
break
# By putting the sleep at the end, tests that are completely
# synchronous and don't use threads will have already broken out of
# the loop, and so will finish without any sleeps. This allows them
# to run as quickly as possible.
#
# However, some tests do talk to APIs that use a thread pool on the
# backend, so we need to allow actual time to pass for those.
time.sleep(0.001)
if result:
return result[0]
if error:
error[0].raiseException()
raise RuntimeError(
"We expected given Deferred to have result already, but it wasn't. "
+ "This is probably a test design issue."

View File

@ -15,8 +15,10 @@ import six
import os, time, sys
import yaml
import json
from threading import current_thread
from twisted.trial import unittest
from twisted.internet import reactor
from foolscap.api import Violation, RemoteException
from allmydata.util import idlib, mathutil
@ -26,6 +28,7 @@ from allmydata.util import pollmixin
from allmydata.util import yamlutil
from allmydata.util import rrefutil
from allmydata.util.fileutil import EncryptedTemporaryFile
from allmydata.util.cputhreadpool import defer_to_thread
from allmydata.test.common_util import ReallyEqualMixin
from .no_network import fireNow, LocalWrapper
@ -553,6 +556,17 @@ class JSONBytes(unittest.TestCase):
o, cls=jsonbytes.AnyBytesJSONEncoder)),
expected,
)
self.assertEqual(
json.loads(jsonbytes.dumps(o, any_bytes=True)),
expected
)
def test_dumps_bytes_unicode_separators(self):
"""Unicode separators don't prevent the result from being bytes."""
result = jsonbytes.dumps_bytes([1, 2], separators=(u',', u':'))
self.assertIsInstance(result, bytes)
self.assertEqual(result, b"[1,2]")
class FakeGetVersion(object):
@ -588,3 +602,31 @@ class RrefUtilTests(unittest.TestCase):
)
self.assertEqual(result.version, "Default")
self.assertIdentical(result, rref)
class CPUThreadPool(unittest.TestCase):
"""Tests for cputhreadpool."""
async def test_runs_in_thread(self):
"""The given function runs in a thread."""
def f(*args, **kwargs):
return current_thread(), args, kwargs
this_thread = current_thread().ident
result = defer_to_thread(reactor, f, 1, 3, key=4, value=5)
# Callbacks run in the correct thread:
callback_thread_ident = []
def passthrough(result):
callback_thread_ident.append(current_thread().ident)
return result
result.addCallback(passthrough)
# The task ran in a different thread:
thread, args, kwargs = await result
self.assertEqual(callback_thread_ident[0], this_thread)
self.assertNotEqual(thread.ident, this_thread)
self.assertEqual(args, (1, 3))
self.assertEqual(kwargs, {"key": 4, "value": 5})

View File

@ -66,13 +66,15 @@ def write_config(tahoe_cfg, config):
"""
Write a configuration to a file.
:param FilePath tahoe_cfg: The path to which to write the config.
:param FilePath tahoe_cfg: The path to which to write the
config. The directories are created if they do not already exist.
:param ConfigParser config: The configuration to write.
:return: ``None``
"""
tmp = tahoe_cfg.temporarySibling()
tahoe_cfg.parent().makedirs(ignoreExistingDirectory=True)
# FilePath.open can only open files in binary mode which does not work
# with ConfigParser.write.
with open(tmp.path, "wt") as fp:
@ -80,7 +82,10 @@ def write_config(tahoe_cfg, config):
# Windows doesn't have atomic overwrite semantics for moveTo. Thus we end
# up slightly less than atomic.
if platform.isWindows():
try:
tahoe_cfg.remove()
except FileNotFoundError:
pass
tmp.moveTo(tahoe_cfg)
def validate_config(fname, cfg, valid_config):
@ -162,7 +167,7 @@ class ValidConfiguration(object):
def is_valid_item(self, section_name, item_name):
"""
:return: True if the given section name, ite name pair is valid, False
:return: True if the given section name, item_name pair is valid, False
otherwise.
"""
return (

View File

@ -0,0 +1,59 @@
"""
A global thread pool for CPU-intensive tasks.
Motivation:
* Certain tasks are blocking on CPU, and so should be run in a thread.
* The Twisted thread pool is used for operations that don't necessarily block
on CPU, like DNS lookups. CPU processing should not block DNS lookups!
* The number of threads should be fixed, and tied to the number of available
CPUs.
As a first pass, this uses ``os.cpu_count()`` to determine the max number of
threads. This may create too many threads, as it doesn't cover things like
scheduler affinity or cgroups, but that's not the end of the world.
"""
import os
from typing import TypeVar, Callable, cast
from functools import partial
import threading
from typing_extensions import ParamSpec
from twisted.python.threadpool import ThreadPool
from twisted.internet.defer import Deferred
from twisted.internet.threads import deferToThreadPool
from twisted.internet.interfaces import IReactorFromThreads
_CPU_THREAD_POOL = ThreadPool(minthreads=0, maxthreads=os.cpu_count(), name="TahoeCPU")
if hasattr(threading, "_register_atexit"):
# This is a private API present in Python 3.8 or later, specifically
# designed for thread pool shutdown. Since it's private, it might go away
# at any point, so if it doesn't exist we still have a solution.
threading._register_atexit(_CPU_THREAD_POOL.stop) # type: ignore
else:
# Daemon threads allow shutdown to happen without any explicit stopping of
# threads. There are some bugs in old Python versions related to daemon
# threads (fixed in subsequent CPython patch releases), but Python's own
# thread pools use daemon threads in those versions so we're no worse off.
_CPU_THREAD_POOL.threadFactory = partial( # type: ignore
_CPU_THREAD_POOL.threadFactory, daemon=True
)
_CPU_THREAD_POOL.start()
P = ParamSpec("P")
R = TypeVar("R")
def defer_to_thread(
reactor: IReactorFromThreads, f: Callable[P, R], *args: P.args, **kwargs: P.kwargs
) -> Deferred[R]:
"""Run the function in a thread, return the result as a ``Deferred``."""
# deferToThreadPool has no type annotations...
result = deferToThreadPool(reactor, _CPU_THREAD_POOL, f, *args, **kwargs)
return cast(Deferred[R], result)
__all__ = ["defer_to_thread"]

View File

@ -9,7 +9,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from future.utils import PY2, PY3
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
@ -108,14 +108,12 @@ def dumps_bytes(obj, *args, **kwargs):
UTF-8 encoded Unicode strings. If True, non-UTF-8 bytes are quoted for
human consumption.
"""
result = dumps(obj, *args, **kwargs)
if PY3:
result = result.encode("utf-8")
return result
return dumps(obj, *args, **kwargs).encode("utf-8")
# To make this module drop-in compatible with json module:
loads = json.loads
load = json.load
__all__ = ["dumps", "loads"]
__all__ = ["dumps", "loads", "load"]