mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-20 09:46:18 +00:00
Merge remote-tracking branch 'origin/master' into 3902-listen-storage-http
This commit is contained in:
commit
8179ea7738
47
docs/check_running.py
Normal file
47
docs/check_running.py
Normal file
@ -0,0 +1,47 @@
|
||||
|
||||
import psutil
|
||||
import filelock
|
||||
|
||||
|
||||
def can_spawn_tahoe(pidfile):
|
||||
"""
|
||||
Determine if we can spawn a Tahoe-LAFS for the given pidfile. That
|
||||
pidfile may be deleted if it is stale.
|
||||
|
||||
:param pathlib.Path pidfile: the file to check, that is the Path
|
||||
to "running.process" in a Tahoe-LAFS configuration directory
|
||||
|
||||
:returns bool: True if we can spawn `tahoe run` here
|
||||
"""
|
||||
lockpath = pidfile.parent / (pidfile.name + ".lock")
|
||||
with filelock.FileLock(lockpath):
|
||||
try:
|
||||
with pidfile.open("r") as f:
|
||||
pid, create_time = f.read().strip().split(" ", 1)
|
||||
except FileNotFoundError:
|
||||
return True
|
||||
|
||||
# somewhat interesting: we have a pidfile
|
||||
pid = int(pid)
|
||||
create_time = float(create_time)
|
||||
|
||||
try:
|
||||
proc = psutil.Process(pid)
|
||||
# most interesting case: there _is_ a process running at the
|
||||
# recorded PID -- but did it just happen to get that PID, or
|
||||
# is it the very same one that wrote the file?
|
||||
if create_time == proc.create_time():
|
||||
# _not_ stale! another intance is still running against
|
||||
# this configuration
|
||||
return False
|
||||
|
||||
except psutil.NoSuchProcess:
|
||||
pass
|
||||
|
||||
# the file is stale
|
||||
pidfile.unlink()
|
||||
return True
|
||||
|
||||
|
||||
from pathlib import Path
|
||||
print("can spawn?", can_spawn_tahoe(Path("running.process")))
|
@ -395,8 +395,8 @@ Encoding
|
||||
General
|
||||
~~~~~~~
|
||||
|
||||
``GET /v1/version``
|
||||
!!!!!!!!!!!!!!!!!!!
|
||||
``GET /storage/v1/version``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
||||
Retrieve information about the version of the storage server.
|
||||
Information is returned as an encoded mapping.
|
||||
@ -409,14 +409,13 @@ For example::
|
||||
"tolerates-immutable-read-overrun": true,
|
||||
"delete-mutable-shares-with-zero-length-writev": true,
|
||||
"fills-holes-with-zero-bytes": true,
|
||||
"prevents-read-past-end-of-share-data": true,
|
||||
"gbs-anonymous-storage-url": "pb://...#v=1"
|
||||
"prevents-read-past-end-of-share-data": true
|
||||
},
|
||||
"application-version": "1.13.0"
|
||||
}
|
||||
|
||||
``PUT /v1/lease/:storage_index``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
``PUT /storage/v1/lease/:storage_index``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
||||
Either renew or create a new lease on the bucket addressed by ``storage_index``.
|
||||
|
||||
@ -468,8 +467,8 @@ Immutable
|
||||
Writing
|
||||
~~~~~~~
|
||||
|
||||
``POST /v1/immutable/:storage_index``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
``POST /storage/v1/immutable/:storage_index``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
||||
Initialize an immutable storage index with some buckets.
|
||||
The buckets may have share data written to them once.
|
||||
@ -504,7 +503,7 @@ Handling repeat calls:
|
||||
Discussion
|
||||
``````````
|
||||
|
||||
We considered making this ``POST /v1/immutable`` instead.
|
||||
We considered making this ``POST /storage/v1/immutable`` instead.
|
||||
The motivation was to keep *storage index* out of the request URL.
|
||||
Request URLs have an elevated chance of being logged by something.
|
||||
We were concerned that having the *storage index* logged may increase some risks.
|
||||
@ -539,8 +538,8 @@ Rejected designs for upload secrets:
|
||||
it must contain randomness.
|
||||
Randomness means there is no need to have a secret per share, since adding share-specific content to randomness doesn't actually make the secret any better.
|
||||
|
||||
``PATCH /v1/immutable/:storage_index/:share_number``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
``PATCH /storage/v1/immutable/:storage_index/:share_number``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
||||
Write data for the indicated share.
|
||||
The share number must belong to the storage index.
|
||||
@ -580,8 +579,8 @@ Responses:
|
||||
the response is ``CONFLICT``.
|
||||
At this point the only thing to do is abort the upload and start from scratch (see below).
|
||||
|
||||
``PUT /v1/immutable/:storage_index/:share_number/abort``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
``PUT /storage/v1/immutable/:storage_index/:share_number/abort``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
||||
This cancels an *in-progress* upload.
|
||||
|
||||
@ -616,8 +615,8 @@ From RFC 7231::
|
||||
PATCH method defined in [RFC5789]).
|
||||
|
||||
|
||||
``POST /v1/immutable/:storage_index/:share_number/corrupt``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
``POST /storage/v1/immutable/:storage_index/:share_number/corrupt``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
||||
Advise the server the data read from the indicated share was corrupt. The
|
||||
request body includes an human-meaningful text string with details about the
|
||||
@ -635,8 +634,8 @@ couldn't be found.
|
||||
Reading
|
||||
~~~~~~~
|
||||
|
||||
``GET /v1/immutable/:storage_index/shares``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
``GET /storage/v1/immutable/:storage_index/shares``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
||||
Retrieve a list (semantically, a set) indicating all shares available for the
|
||||
indicated storage index. For example::
|
||||
@ -645,8 +644,8 @@ indicated storage index. For example::
|
||||
|
||||
An unknown storage index results in an empty list.
|
||||
|
||||
``GET /v1/immutable/:storage_index/:share_number``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
``GET /storage/v1/immutable/:storage_index/:share_number``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
||||
Read a contiguous sequence of bytes from one share in one bucket.
|
||||
The response body is the raw share data (i.e., ``application/octet-stream``).
|
||||
@ -686,8 +685,8 @@ Mutable
|
||||
Writing
|
||||
~~~~~~~
|
||||
|
||||
``POST /v1/mutable/:storage_index/read-test-write``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
``POST /storage/v1/mutable/:storage_index/read-test-write``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
||||
General purpose read-test-and-write operation for mutable storage indexes.
|
||||
A mutable storage index is also called a "slot"
|
||||
@ -742,18 +741,18 @@ As a result, if there is no data at all, an empty bytestring is returned no matt
|
||||
Reading
|
||||
~~~~~~~
|
||||
|
||||
``GET /v1/mutable/:storage_index/shares``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
``GET /storage/v1/mutable/:storage_index/shares``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
||||
Retrieve a set indicating all shares available for the indicated storage index.
|
||||
For example (this is shown as list, since it will be list for JSON, but will be set for CBOR)::
|
||||
|
||||
[1, 5]
|
||||
|
||||
``GET /v1/mutable/:storage_index/:share_number``
|
||||
``GET /storage/v1/mutable/:storage_index/:share_number``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
||||
Read data from the indicated mutable shares, just like ``GET /v1/immutable/:storage_index``
|
||||
Read data from the indicated mutable shares, just like ``GET /storage/v1/immutable/:storage_index``
|
||||
|
||||
The ``Range`` header may be used to request exactly one ``bytes`` range, in which case the response code will be 206 (partial content).
|
||||
Interpretation and response behavior is as specified in RFC 7233 § 4.1.
|
||||
@ -765,8 +764,8 @@ The resulting ``Content-Range`` header will be consistent with the returned data
|
||||
If the response to a query is an empty range, the ``NO CONTENT`` (204) response code will be used.
|
||||
|
||||
|
||||
``POST /v1/mutable/:storage_index/:share_number/corrupt``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
``POST /storage/v1/mutable/:storage_index/:share_number/corrupt``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
||||
Advise the server the data read from the indicated share was corrupt.
|
||||
Just like the immutable version.
|
||||
@ -779,7 +778,7 @@ Immutable Data
|
||||
|
||||
1. Create a bucket for storage index ``AAAAAAAAAAAAAAAA`` to hold two immutable shares, discovering that share ``1`` was already uploaded::
|
||||
|
||||
POST /v1/immutable/AAAAAAAAAAAAAAAA
|
||||
POST /storage/v1/immutable/AAAAAAAAAAAAAAAA
|
||||
Authorization: Tahoe-LAFS nurl-swissnum
|
||||
X-Tahoe-Authorization: lease-renew-secret efgh
|
||||
X-Tahoe-Authorization: lease-cancel-secret jjkl
|
||||
@ -792,7 +791,7 @@ Immutable Data
|
||||
|
||||
#. Upload the content for immutable share ``7``::
|
||||
|
||||
PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
|
||||
PATCH /storage/v1/immutable/AAAAAAAAAAAAAAAA/7
|
||||
Authorization: Tahoe-LAFS nurl-swissnum
|
||||
Content-Range: bytes 0-15/48
|
||||
X-Tahoe-Authorization: upload-secret xyzf
|
||||
@ -800,7 +799,7 @@ Immutable Data
|
||||
|
||||
200 OK
|
||||
|
||||
PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
|
||||
PATCH /storage/v1/immutable/AAAAAAAAAAAAAAAA/7
|
||||
Authorization: Tahoe-LAFS nurl-swissnum
|
||||
Content-Range: bytes 16-31/48
|
||||
X-Tahoe-Authorization: upload-secret xyzf
|
||||
@ -808,7 +807,7 @@ Immutable Data
|
||||
|
||||
200 OK
|
||||
|
||||
PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
|
||||
PATCH /storage/v1/immutable/AAAAAAAAAAAAAAAA/7
|
||||
Authorization: Tahoe-LAFS nurl-swissnum
|
||||
Content-Range: bytes 32-47/48
|
||||
X-Tahoe-Authorization: upload-secret xyzf
|
||||
@ -818,7 +817,7 @@ Immutable Data
|
||||
|
||||
#. Download the content of the previously uploaded immutable share ``7``::
|
||||
|
||||
GET /v1/immutable/AAAAAAAAAAAAAAAA?share=7
|
||||
GET /storage/v1/immutable/AAAAAAAAAAAAAAAA?share=7
|
||||
Authorization: Tahoe-LAFS nurl-swissnum
|
||||
Range: bytes=0-47
|
||||
|
||||
@ -827,7 +826,7 @@ Immutable Data
|
||||
|
||||
#. Renew the lease on all immutable shares in bucket ``AAAAAAAAAAAAAAAA``::
|
||||
|
||||
PUT /v1/lease/AAAAAAAAAAAAAAAA
|
||||
PUT /storage/v1/lease/AAAAAAAAAAAAAAAA
|
||||
Authorization: Tahoe-LAFS nurl-swissnum
|
||||
X-Tahoe-Authorization: lease-cancel-secret jjkl
|
||||
X-Tahoe-Authorization: lease-renew-secret efgh
|
||||
@ -842,7 +841,7 @@ The special test vector of size 1 but empty bytes will only pass
|
||||
if there is no existing share,
|
||||
otherwise it will read a byte which won't match `b""`::
|
||||
|
||||
POST /v1/mutable/BBBBBBBBBBBBBBBB/read-test-write
|
||||
POST /storage/v1/mutable/BBBBBBBBBBBBBBBB/read-test-write
|
||||
Authorization: Tahoe-LAFS nurl-swissnum
|
||||
X-Tahoe-Authorization: write-enabler abcd
|
||||
X-Tahoe-Authorization: lease-cancel-secret efgh
|
||||
@ -874,7 +873,7 @@ otherwise it will read a byte which won't match `b""`::
|
||||
|
||||
#. Safely rewrite the contents of a known version of mutable share number ``3`` (or fail)::
|
||||
|
||||
POST /v1/mutable/BBBBBBBBBBBBBBBB/read-test-write
|
||||
POST /storage/v1/mutable/BBBBBBBBBBBBBBBB/read-test-write
|
||||
Authorization: Tahoe-LAFS nurl-swissnum
|
||||
X-Tahoe-Authorization: write-enabler abcd
|
||||
X-Tahoe-Authorization: lease-cancel-secret efgh
|
||||
@ -906,14 +905,14 @@ otherwise it will read a byte which won't match `b""`::
|
||||
|
||||
#. Download the contents of share number ``3``::
|
||||
|
||||
GET /v1/mutable/BBBBBBBBBBBBBBBB?share=3&offset=0&size=10
|
||||
GET /storage/v1/mutable/BBBBBBBBBBBBBBBB?share=3&offset=0&size=10
|
||||
Authorization: Tahoe-LAFS nurl-swissnum
|
||||
|
||||
<complete 16 bytes of previously uploaded data>
|
||||
|
||||
#. Renew the lease on previously uploaded mutable share in slot ``BBBBBBBBBBBBBBBB``::
|
||||
|
||||
PUT /v1/lease/BBBBBBBBBBBBBBBB
|
||||
PUT /storage/v1/lease/BBBBBBBBBBBBBBBB
|
||||
Authorization: Tahoe-LAFS nurl-swissnum
|
||||
X-Tahoe-Authorization: lease-cancel-secret efgh
|
||||
X-Tahoe-Authorization: lease-renew-secret ijkl
|
||||
|
@ -124,6 +124,35 @@ Tahoe-LAFS.
|
||||
.. _magic wormhole: https://magic-wormhole.io/
|
||||
|
||||
|
||||
Multiple Instances
|
||||
------------------
|
||||
|
||||
Running multiple instances against the same configuration directory isn't supported.
|
||||
This will lead to undefined behavior and could corrupt the configuration or state.
|
||||
|
||||
We attempt to avoid this situation with a "pidfile"-style file in the config directory called ``running.process``.
|
||||
There may be a parallel file called ``running.process.lock`` in existence.
|
||||
|
||||
The ``.lock`` file exists to make sure only one process modifies ``running.process`` at once.
|
||||
The lock file is managed by the `lockfile <https://pypi.org/project/lockfile/>`_ library.
|
||||
If you wish to make use of ``running.process`` for any reason you should also lock it and follow the semantics of lockfile.
|
||||
|
||||
If ``running.process`` exists then it contains the PID and the creation-time of the process.
|
||||
When no such file exists, there is no other process running on this configuration.
|
||||
If there is a ``running.process`` file, it may be a leftover file or it may indicate that another process is running against this config.
|
||||
To tell the difference, determine if the PID in the file exists currently.
|
||||
If it does, check the creation-time of the process versus the one in the file.
|
||||
If these match, there is another process currently running and using this config.
|
||||
Otherwise, the file is stale -- it should be removed before starting Tahoe-LAFS.
|
||||
|
||||
Some example Python code to check the above situations:
|
||||
|
||||
.. literalinclude:: check_running.py
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
A note about small grids
|
||||
------------------------
|
||||
|
||||
|
@ -103,11 +103,8 @@ Version 1
|
||||
|
||||
The hash component of a version 1 NURL differs in three ways from the prior version.
|
||||
|
||||
1. The hash function used is SHA3-224 instead of SHA1.
|
||||
The security of SHA1 `continues to be eroded`_.
|
||||
Contrariwise SHA3 is currently the most recent addition to the SHA family by NIST.
|
||||
The 224 bit instance is chosen to keep the output short and because it offers greater collision resistance than SHA1 was thought to offer even at its inception
|
||||
(prior to security research showing actual collision resistance is lower).
|
||||
1. The hash function used is SHA-256, to match RFC 7469.
|
||||
The security of SHA1 `continues to be eroded`_; Latacora `SHA-2`_.
|
||||
2. The hash is computed over the certificate's SPKI instead of the whole certificate.
|
||||
This allows certificate re-generation so long as the public key remains the same.
|
||||
This is useful to allow contact information to be updated or extension of validity period.
|
||||
@ -140,7 +137,8 @@ Examples
|
||||
* ``pb://azEu8vlRpnEeYm0DySQDeNY3Z2iJXHC_bsbaAw@localhost:47877/64i4aokv4ej#v=1``
|
||||
|
||||
.. _`continues to be eroded`: https://en.wikipedia.org/wiki/SHA-1#Cryptanalysis_and_validation
|
||||
.. _`explored by the web community`: https://www.imperialviolet.org/2011/05/04/pinning.html
|
||||
.. _`SHA-2`: https://latacora.micro.blog/2018/04/03/cryptographic-right-answers.html
|
||||
.. _`explored by the web community`: https://www.rfc-editor.org/rfc/rfc7469
|
||||
.. _Foolscap: https://github.com/warner/foolscap
|
||||
|
||||
.. [1] ``foolscap.furl.decode_furl`` is taken as the canonical definition of the syntax of a fURL.
|
||||
|
@ -264,3 +264,18 @@ the "tahoe-conf" file for notes about configuration and installing these
|
||||
plugins into a Munin environment.
|
||||
|
||||
.. _Munin: http://munin-monitoring.org/
|
||||
|
||||
|
||||
Scraping Stats Values in OpenMetrics Format
|
||||
===========================================
|
||||
|
||||
Time Series DataBase (TSDB) software like Prometheus_ and VictoriaMetrics_ can
|
||||
parse statistics from the e.g. http://localhost:3456/statistics?t=openmetrics
|
||||
URL in OpenMetrics_ format. Software like Grafana_ can then be used to graph
|
||||
and alert on these numbers. You can find a pre-configured dashboard for
|
||||
Grafana at https://grafana.com/grafana/dashboards/16894-tahoe-lafs/.
|
||||
|
||||
.. _OpenMetrics: https://openmetrics.io/
|
||||
.. _Prometheus: https://prometheus.io/
|
||||
.. _VictoriaMetrics: https://victoriametrics.com/
|
||||
.. _Grafana: https://grafana.com/
|
||||
|
1
newsfragments/3786.minor
Normal file
1
newsfragments/3786.minor
Normal file
@ -0,0 +1 @@
|
||||
Added re-structured text documentation for the OpenMetrics format statistics endpoint.
|
0
newsfragments/3904.minor
Normal file
0
newsfragments/3904.minor
Normal file
0
newsfragments/3915.minor
Normal file
0
newsfragments/3915.minor
Normal file
10
newsfragments/3926.incompat
Normal file
10
newsfragments/3926.incompat
Normal file
@ -0,0 +1,10 @@
|
||||
Record both the PID and the process creation-time:
|
||||
|
||||
a new kind of pidfile in `running.process` records both
|
||||
the PID and the creation-time of the process. This facilitates
|
||||
automatic discovery of a "stale" pidfile that points to a
|
||||
currently-running process. If the recorded creation-time matches
|
||||
the creation-time of the running process, then it is a still-running
|
||||
`tahoe run` process. Otherwise, the file is stale.
|
||||
|
||||
The `twistd.pid` file is no longer present.
|
4
setup.py
4
setup.py
@ -138,6 +138,10 @@ install_requires = [
|
||||
"treq",
|
||||
"cbor2",
|
||||
"pycddl",
|
||||
|
||||
# for pid-file support
|
||||
"psutil",
|
||||
"filelock",
|
||||
]
|
||||
|
||||
setup_requires = [
|
||||
|
@ -694,3 +694,24 @@ class Encoder(object):
|
||||
return self.uri_extension_data
|
||||
def get_uri_extension_hash(self):
|
||||
return self.uri_extension_hash
|
||||
|
||||
def get_uri_extension_size(self):
|
||||
"""
|
||||
Calculate the size of the URI extension that gets written at the end of
|
||||
immutables.
|
||||
|
||||
This may be done earlier than actual encoding, so e.g. we might not
|
||||
know the crypttext hashes, but that's fine for our purposes since we
|
||||
only care about the length.
|
||||
"""
|
||||
params = self.uri_extension_data.copy()
|
||||
params["crypttext_hash"] = b"\x00" * hashutil.CRYPTO_VAL_SIZE
|
||||
params["crypttext_root_hash"] = b"\x00" * hashutil.CRYPTO_VAL_SIZE
|
||||
params["share_root_hash"] = b"\x00" * hashutil.CRYPTO_VAL_SIZE
|
||||
assert params.keys() == {
|
||||
"codec_name", "codec_params", "size", "segment_size", "num_segments",
|
||||
"needed_shares", "total_shares", "tail_codec_params",
|
||||
"crypttext_hash", "crypttext_root_hash", "share_root_hash"
|
||||
}, params.keys()
|
||||
uri_extension = uri.pack_extension(params)
|
||||
return len(uri_extension)
|
||||
|
@ -19,6 +19,7 @@ from allmydata.util import mathutil, observer, pipeline, log
|
||||
from allmydata.util.assertutil import precondition
|
||||
from allmydata.storage.server import si_b2a
|
||||
|
||||
|
||||
class LayoutInvalid(Exception):
|
||||
""" There is something wrong with these bytes so they can't be
|
||||
interpreted as the kind of immutable file that I know how to download."""
|
||||
@ -90,7 +91,7 @@ FORCE_V2 = False # set briefly by unit tests to make small-sized V2 shares
|
||||
|
||||
def make_write_bucket_proxy(rref, server,
|
||||
data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max):
|
||||
num_share_hashes, uri_extension_size):
|
||||
# Use layout v1 for small files, so they'll be readable by older versions
|
||||
# (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
|
||||
# by tahoe-1.3.0 or later.
|
||||
@ -99,11 +100,11 @@ def make_write_bucket_proxy(rref, server,
|
||||
raise FileTooLargeError
|
||||
wbp = WriteBucketProxy(rref, server,
|
||||
data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max)
|
||||
num_share_hashes, uri_extension_size)
|
||||
except FileTooLargeError:
|
||||
wbp = WriteBucketProxy_v2(rref, server,
|
||||
data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max)
|
||||
num_share_hashes, uri_extension_size)
|
||||
return wbp
|
||||
|
||||
@implementer(IStorageBucketWriter)
|
||||
@ -112,20 +113,20 @@ class WriteBucketProxy(object):
|
||||
fieldstruct = ">L"
|
||||
|
||||
def __init__(self, rref, server, data_size, block_size, num_segments,
|
||||
num_share_hashes, uri_extension_size_max, pipeline_size=50000):
|
||||
num_share_hashes, uri_extension_size, pipeline_size=50000):
|
||||
self._rref = rref
|
||||
self._server = server
|
||||
self._data_size = data_size
|
||||
self._block_size = block_size
|
||||
self._num_segments = num_segments
|
||||
self._written_bytes = 0
|
||||
|
||||
effective_segments = mathutil.next_power_of_k(num_segments,2)
|
||||
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
|
||||
# how many share hashes are included in each share? This will be
|
||||
# about ln2(num_shares).
|
||||
self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE)
|
||||
# we commit to not sending a uri extension larger than this
|
||||
self._uri_extension_size_max = uri_extension_size_max
|
||||
self._uri_extension_size = uri_extension_size
|
||||
|
||||
self._create_offsets(block_size, data_size)
|
||||
|
||||
@ -137,7 +138,7 @@ class WriteBucketProxy(object):
|
||||
|
||||
def get_allocated_size(self):
|
||||
return (self._offsets['uri_extension'] + self.fieldsize +
|
||||
self._uri_extension_size_max)
|
||||
self._uri_extension_size)
|
||||
|
||||
def _create_offsets(self, block_size, data_size):
|
||||
if block_size >= 2**32 or data_size >= 2**32:
|
||||
@ -195,6 +196,14 @@ class WriteBucketProxy(object):
|
||||
return self._write(offset, data)
|
||||
|
||||
def put_crypttext_hashes(self, hashes):
|
||||
# plaintext_hash_tree precedes crypttext_hash_tree. It is not used, and
|
||||
# so is not explicitly written, but we need to write everything, so
|
||||
# fill it in with nulls.
|
||||
d = self._write(self._offsets['plaintext_hash_tree'], b"\x00" * self._segment_hash_size)
|
||||
d.addCallback(lambda _: self._really_put_crypttext_hashes(hashes))
|
||||
return d
|
||||
|
||||
def _really_put_crypttext_hashes(self, hashes):
|
||||
offset = self._offsets['crypttext_hash_tree']
|
||||
assert isinstance(hashes, list)
|
||||
data = b"".join(hashes)
|
||||
@ -233,8 +242,7 @@ class WriteBucketProxy(object):
|
||||
def put_uri_extension(self, data):
|
||||
offset = self._offsets['uri_extension']
|
||||
assert isinstance(data, bytes)
|
||||
precondition(len(data) <= self._uri_extension_size_max,
|
||||
len(data), self._uri_extension_size_max)
|
||||
precondition(len(data) == self._uri_extension_size)
|
||||
length = struct.pack(self.fieldstruct, len(data))
|
||||
return self._write(offset, length+data)
|
||||
|
||||
@ -244,11 +252,12 @@ class WriteBucketProxy(object):
|
||||
# would reduce the foolscap CPU overhead per share, but wouldn't
|
||||
# reduce the number of round trips, so it might not be worth the
|
||||
# effort.
|
||||
|
||||
self._written_bytes += len(data)
|
||||
return self._pipeline.add(len(data),
|
||||
self._rref.callRemote, "write", offset, data)
|
||||
|
||||
def close(self):
|
||||
assert self._written_bytes == self.get_allocated_size(), f"{self._written_bytes} != {self.get_allocated_size()}"
|
||||
d = self._pipeline.add(0, self._rref.callRemote, "close")
|
||||
d.addCallback(lambda ign: self._pipeline.flush())
|
||||
return d
|
||||
@ -303,8 +312,6 @@ class WriteBucketProxy_v2(WriteBucketProxy):
|
||||
@implementer(IStorageBucketReader)
|
||||
class ReadBucketProxy(object):
|
||||
|
||||
MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes
|
||||
|
||||
def __init__(self, rref, server, storage_index):
|
||||
self._rref = rref
|
||||
self._server = server
|
||||
@ -332,11 +339,6 @@ class ReadBucketProxy(object):
|
||||
# TODO: for small shares, read the whole bucket in _start()
|
||||
d = self._fetch_header()
|
||||
d.addCallback(self._parse_offsets)
|
||||
# XXX The following two callbacks implement a slightly faster/nicer
|
||||
# way to get the ueb and sharehashtree, but it requires that the
|
||||
# storage server be >= v1.3.0.
|
||||
# d.addCallback(self._fetch_sharehashtree_and_ueb)
|
||||
# d.addCallback(self._parse_sharehashtree_and_ueb)
|
||||
def _fail_waiters(f):
|
||||
self._ready.fire(f)
|
||||
def _notify_waiters(result):
|
||||
@ -381,29 +383,6 @@ class ReadBucketProxy(object):
|
||||
self._offsets[field] = offset
|
||||
return self._offsets
|
||||
|
||||
def _fetch_sharehashtree_and_ueb(self, offsets):
|
||||
sharehashtree_size = offsets['uri_extension'] - offsets['share_hashes']
|
||||
return self._read(offsets['share_hashes'],
|
||||
self.MAX_UEB_SIZE+sharehashtree_size)
|
||||
|
||||
def _parse_sharehashtree_and_ueb(self, data):
|
||||
sharehashtree_size = self._offsets['uri_extension'] - self._offsets['share_hashes']
|
||||
if len(data) < sharehashtree_size:
|
||||
raise LayoutInvalid("share hash tree truncated -- should have at least %d bytes -- not %d" % (sharehashtree_size, len(data)))
|
||||
if sharehashtree_size % (2+HASH_SIZE) != 0:
|
||||
raise LayoutInvalid("share hash tree malformed -- should have an even multiple of %d bytes -- not %d" % (2+HASH_SIZE, sharehashtree_size))
|
||||
self._share_hashes = []
|
||||
for i in range(0, sharehashtree_size, 2+HASH_SIZE):
|
||||
hashnum = struct.unpack(">H", data[i:i+2])[0]
|
||||
hashvalue = data[i+2:i+2+HASH_SIZE]
|
||||
self._share_hashes.append( (hashnum, hashvalue) )
|
||||
|
||||
i = self._offsets['uri_extension']-self._offsets['share_hashes']
|
||||
if len(data) < i+self._fieldsize:
|
||||
raise LayoutInvalid("not enough bytes to encode URI length -- should be at least %d bytes long, not %d " % (i+self._fieldsize, len(data),))
|
||||
length = struct.unpack(self._fieldstruct, data[i:i+self._fieldsize])[0]
|
||||
self._ueb_data = data[i+self._fieldsize:i+self._fieldsize+length]
|
||||
|
||||
def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):
|
||||
offset = self._offsets['data'] + blocknum * blocksize
|
||||
return self._read(offset, thisblocksize)
|
||||
@ -446,20 +425,18 @@ class ReadBucketProxy(object):
|
||||
else:
|
||||
return defer.succeed([])
|
||||
|
||||
def _get_share_hashes(self, unused=None):
|
||||
if hasattr(self, '_share_hashes'):
|
||||
return self._share_hashes
|
||||
return self._get_share_hashes_the_old_way()
|
||||
|
||||
def get_share_hashes(self):
|
||||
d = self._start_if_needed()
|
||||
d.addCallback(self._get_share_hashes)
|
||||
return d
|
||||
|
||||
def _get_share_hashes_the_old_way(self):
|
||||
def _get_share_hashes(self, _ignore):
|
||||
""" Tahoe storage servers < v1.3.0 would return an error if you tried
|
||||
to read past the end of the share, so we need to use the offset and
|
||||
read just that much."""
|
||||
read just that much.
|
||||
|
||||
HTTP-based storage protocol also doesn't like reading past the end.
|
||||
"""
|
||||
offset = self._offsets['share_hashes']
|
||||
size = self._offsets['uri_extension'] - offset
|
||||
if size % (2+HASH_SIZE) != 0:
|
||||
@ -477,32 +454,29 @@ class ReadBucketProxy(object):
|
||||
d.addCallback(_unpack_share_hashes)
|
||||
return d
|
||||
|
||||
def _get_uri_extension_the_old_way(self, unused=None):
|
||||
def _get_uri_extension(self, unused=None):
|
||||
""" Tahoe storage servers < v1.3.0 would return an error if you tried
|
||||
to read past the end of the share, so we need to fetch the UEB size
|
||||
and then read just that much."""
|
||||
and then read just that much.
|
||||
|
||||
HTTP-based storage protocol also doesn't like reading past the end.
|
||||
"""
|
||||
offset = self._offsets['uri_extension']
|
||||
d = self._read(offset, self._fieldsize)
|
||||
def _got_length(data):
|
||||
if len(data) != self._fieldsize:
|
||||
raise LayoutInvalid("not enough bytes to encode URI length -- should be %d bytes long, not %d " % (self._fieldsize, len(data),))
|
||||
length = struct.unpack(self._fieldstruct, data)[0]
|
||||
if length >= 2**31:
|
||||
# URI extension blocks are around 419 bytes long, so this
|
||||
# must be corrupted. Anyway, the foolscap interface schema
|
||||
# for "read" will not allow >= 2**31 bytes length.
|
||||
if length >= 2000:
|
||||
# URI extension blocks are around 419 bytes long; in previous
|
||||
# versions of the code 1000 was used as a default catchall. So
|
||||
# 2000 or more must be corrupted.
|
||||
raise RidiculouslyLargeURIExtensionBlock(length)
|
||||
|
||||
return self._read(offset+self._fieldsize, length)
|
||||
d.addCallback(_got_length)
|
||||
return d
|
||||
|
||||
def _get_uri_extension(self, unused=None):
|
||||
if hasattr(self, '_ueb_data'):
|
||||
return self._ueb_data
|
||||
else:
|
||||
return self._get_uri_extension_the_old_way()
|
||||
|
||||
def get_uri_extension(self):
|
||||
d = self._start_if_needed()
|
||||
d.addCallback(self._get_uri_extension)
|
||||
|
@ -242,31 +242,26 @@ class UploadResults(object):
|
||||
def get_verifycapstr(self):
|
||||
return self._verifycapstr
|
||||
|
||||
# our current uri_extension is 846 bytes for small files, a few bytes
|
||||
# more for larger ones (since the filesize is encoded in decimal in a
|
||||
# few places). Ask for a little bit more just in case we need it. If
|
||||
# the extension changes size, we can change EXTENSION_SIZE to
|
||||
# allocate a more accurate amount of space.
|
||||
EXTENSION_SIZE = 1000
|
||||
# TODO: actual extensions are closer to 419 bytes, so we can probably lower
|
||||
# this.
|
||||
|
||||
def pretty_print_shnum_to_servers(s):
|
||||
return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.items() ])
|
||||
|
||||
|
||||
class ServerTracker(object):
|
||||
def __init__(self, server,
|
||||
sharesize, blocksize, num_segments, num_share_hashes,
|
||||
storage_index,
|
||||
bucket_renewal_secret, bucket_cancel_secret):
|
||||
bucket_renewal_secret, bucket_cancel_secret,
|
||||
uri_extension_size):
|
||||
self._server = server
|
||||
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
|
||||
self.sharesize = sharesize
|
||||
self.uri_extension_size = uri_extension_size
|
||||
|
||||
wbp = layout.make_write_bucket_proxy(None, None, sharesize,
|
||||
blocksize, num_segments,
|
||||
num_share_hashes,
|
||||
EXTENSION_SIZE)
|
||||
uri_extension_size)
|
||||
self.wbp_class = wbp.__class__ # to create more of them
|
||||
self.allocated_size = wbp.get_allocated_size()
|
||||
self.blocksize = blocksize
|
||||
@ -314,7 +309,7 @@ class ServerTracker(object):
|
||||
self.blocksize,
|
||||
self.num_segments,
|
||||
self.num_share_hashes,
|
||||
EXTENSION_SIZE)
|
||||
self.uri_extension_size)
|
||||
b[sharenum] = bp
|
||||
self.buckets.update(b)
|
||||
return (alreadygot, set(b.keys()))
|
||||
@ -487,7 +482,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
def get_shareholders(self, storage_broker, secret_holder,
|
||||
storage_index, share_size, block_size,
|
||||
num_segments, total_shares, needed_shares,
|
||||
min_happiness):
|
||||
min_happiness, uri_extension_size):
|
||||
"""
|
||||
@return: (upload_trackers, already_serverids), where upload_trackers
|
||||
is a set of ServerTracker instances that have agreed to hold
|
||||
@ -529,7 +524,8 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
# figure out how much space to ask for
|
||||
wbp = layout.make_write_bucket_proxy(None, None,
|
||||
share_size, 0, num_segments,
|
||||
num_share_hashes, EXTENSION_SIZE)
|
||||
num_share_hashes,
|
||||
uri_extension_size)
|
||||
allocated_size = wbp.get_allocated_size()
|
||||
|
||||
# decide upon the renewal/cancel secrets, to include them in the
|
||||
@ -554,7 +550,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
def _create_server_tracker(server, renew, cancel):
|
||||
return ServerTracker(
|
||||
server, share_size, block_size, num_segments, num_share_hashes,
|
||||
storage_index, renew, cancel,
|
||||
storage_index, renew, cancel, uri_extension_size
|
||||
)
|
||||
|
||||
readonly_trackers, write_trackers = self._create_trackers(
|
||||
@ -1326,7 +1322,8 @@ class CHKUploader(object):
|
||||
d = server_selector.get_shareholders(storage_broker, secret_holder,
|
||||
storage_index,
|
||||
share_size, block_size,
|
||||
num_segments, n, k, desired)
|
||||
num_segments, n, k, desired,
|
||||
encoder.get_uri_extension_size())
|
||||
def _done(res):
|
||||
self._server_selection_elapsed = time.time() - server_selection_started
|
||||
return res
|
||||
|
@ -47,11 +47,6 @@ if _default_nodedir:
|
||||
NODEDIR_HELP += " [default for most commands: " + quote_local_unicode_path(_default_nodedir) + "]"
|
||||
|
||||
|
||||
# XXX all this 'dispatch' stuff needs to be unified + fixed up
|
||||
_control_node_dispatch = {
|
||||
"run": tahoe_run.run,
|
||||
}
|
||||
|
||||
process_control_commands = [
|
||||
("run", None, tahoe_run.RunOptions, "run a node without daemonizing"),
|
||||
] # type: SubCommands
|
||||
@ -195,6 +190,7 @@ def parse_or_exit(config, argv, stdout, stderr):
|
||||
return config
|
||||
|
||||
def dispatch(config,
|
||||
reactor,
|
||||
stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr):
|
||||
command = config.subCommand
|
||||
so = config.subOptions
|
||||
@ -206,8 +202,8 @@ def dispatch(config,
|
||||
|
||||
if command in create_dispatch:
|
||||
f = create_dispatch[command]
|
||||
elif command in _control_node_dispatch:
|
||||
f = _control_node_dispatch[command]
|
||||
elif command == "run":
|
||||
f = lambda config: tahoe_run.run(reactor, config)
|
||||
elif command in debug.dispatch:
|
||||
f = debug.dispatch[command]
|
||||
elif command in admin.dispatch:
|
||||
@ -361,7 +357,7 @@ def _run_with_reactor(reactor, config, argv, stdout, stderr):
|
||||
stderr,
|
||||
)
|
||||
d.addCallback(_maybe_enable_eliot_logging, reactor)
|
||||
d.addCallback(dispatch, stdout=stdout, stderr=stderr)
|
||||
d.addCallback(dispatch, reactor, stdout=stdout, stderr=stderr)
|
||||
def _show_exception(f):
|
||||
# when task.react() notices a non-SystemExit exception, it does
|
||||
# log.err() with the failure and then exits with rc=1. We want this
|
||||
|
@ -19,6 +19,7 @@ import os, sys
|
||||
from allmydata.scripts.common import BasedirOptions
|
||||
from twisted.scripts import twistd
|
||||
from twisted.python import usage
|
||||
from twisted.python.filepath import FilePath
|
||||
from twisted.python.reflect import namedAny
|
||||
from twisted.internet.defer import maybeDeferred
|
||||
from twisted.application.service import Service
|
||||
@ -27,6 +28,13 @@ from allmydata.scripts.default_nodedir import _default_nodedir
|
||||
from allmydata.util.encodingutil import listdir_unicode, quote_local_unicode_path
|
||||
from allmydata.util.configutil import UnknownConfigError
|
||||
from allmydata.util.deferredutil import HookMixin
|
||||
from allmydata.util.pid import (
|
||||
parse_pidfile,
|
||||
check_pid_process,
|
||||
cleanup_pidfile,
|
||||
ProcessInTheWay,
|
||||
InvalidPidFile,
|
||||
)
|
||||
from allmydata.storage.crawler import (
|
||||
MigratePickleFileError,
|
||||
)
|
||||
@ -35,35 +43,34 @@ from allmydata.node import (
|
||||
PrivacyError,
|
||||
)
|
||||
|
||||
|
||||
def get_pidfile(basedir):
|
||||
"""
|
||||
Returns the path to the PID file.
|
||||
:param basedir: the node's base directory
|
||||
:returns: the path to the PID file
|
||||
"""
|
||||
return os.path.join(basedir, u"twistd.pid")
|
||||
return os.path.join(basedir, u"running.process")
|
||||
|
||||
|
||||
def get_pid_from_pidfile(pidfile):
|
||||
"""
|
||||
Tries to read and return the PID stored in the node's PID file
|
||||
(twistd.pid).
|
||||
|
||||
:param pidfile: try to read this PID file
|
||||
:returns: A numeric PID on success, ``None`` if PID file absent or
|
||||
inaccessible, ``-1`` if PID file invalid.
|
||||
"""
|
||||
try:
|
||||
with open(pidfile, "r") as f:
|
||||
pid = f.read()
|
||||
pid, _ = parse_pidfile(pidfile)
|
||||
except EnvironmentError:
|
||||
return None
|
||||
|
||||
try:
|
||||
pid = int(pid)
|
||||
except ValueError:
|
||||
except InvalidPidFile:
|
||||
return -1
|
||||
|
||||
return pid
|
||||
|
||||
|
||||
def identify_node_type(basedir):
|
||||
"""
|
||||
:return unicode: None or one of: 'client' or 'introducer'.
|
||||
@ -206,7 +213,7 @@ class DaemonizeTahoeNodePlugin(object):
|
||||
return DaemonizeTheRealService(self.nodetype, self.basedir, so)
|
||||
|
||||
|
||||
def run(config, runApp=twistd.runApp):
|
||||
def run(reactor, config, runApp=twistd.runApp):
|
||||
"""
|
||||
Runs a Tahoe-LAFS node in the foreground.
|
||||
|
||||
@ -227,10 +234,15 @@ def run(config, runApp=twistd.runApp):
|
||||
print("%s is not a recognizable node directory" % quoted_basedir, file=err)
|
||||
return 1
|
||||
|
||||
twistd_args = ["--nodaemon", "--rundir", basedir]
|
||||
twistd_args = [
|
||||
# ensure twistd machinery does not daemonize.
|
||||
"--nodaemon",
|
||||
"--rundir", basedir,
|
||||
]
|
||||
if sys.platform != "win32":
|
||||
pidfile = get_pidfile(basedir)
|
||||
twistd_args.extend(["--pidfile", pidfile])
|
||||
# turn off Twisted's pid-file to use our own -- but not on
|
||||
# windows, because twistd doesn't know about pidfiles there
|
||||
twistd_args.extend(["--pidfile", None])
|
||||
twistd_args.extend(config.twistd_args)
|
||||
twistd_args.append("DaemonizeTahoeNode") # point at our DaemonizeTahoeNodePlugin
|
||||
|
||||
@ -246,10 +258,18 @@ def run(config, runApp=twistd.runApp):
|
||||
return 1
|
||||
twistd_config.loadedPlugins = {"DaemonizeTahoeNode": DaemonizeTahoeNodePlugin(nodetype, basedir)}
|
||||
|
||||
# handle invalid PID file (twistd might not start otherwise)
|
||||
if sys.platform != "win32" and get_pid_from_pidfile(pidfile) == -1:
|
||||
print("found invalid PID file in %s - deleting it" % basedir, file=err)
|
||||
os.remove(pidfile)
|
||||
# our own pid-style file contains PID and process creation time
|
||||
pidfile = FilePath(get_pidfile(config['basedir']))
|
||||
try:
|
||||
check_pid_process(pidfile)
|
||||
except (ProcessInTheWay, InvalidPidFile) as e:
|
||||
print("ERROR: {}".format(e), file=err)
|
||||
return 1
|
||||
else:
|
||||
reactor.addSystemEventTrigger(
|
||||
"after", "shutdown",
|
||||
lambda: cleanup_pidfile(pidfile)
|
||||
)
|
||||
|
||||
# We always pass --nodaemon so twistd.runApp does not daemonize.
|
||||
print("running node in %s" % (quoted_basedir,), file=out)
|
||||
|
@ -392,7 +392,7 @@ class StorageClientGeneral(object):
|
||||
"""
|
||||
Return the version metadata for the server.
|
||||
"""
|
||||
url = self._client.relative_url("/v1/version")
|
||||
url = self._client.relative_url("/storage/v1/version")
|
||||
response = yield self._client.request("GET", url)
|
||||
decoded_response = yield _decode_cbor(response, _SCHEMAS["get_version"])
|
||||
returnValue(decoded_response)
|
||||
@ -408,7 +408,7 @@ class StorageClientGeneral(object):
|
||||
Otherwise a new lease is added.
|
||||
"""
|
||||
url = self._client.relative_url(
|
||||
"/v1/lease/{}".format(_encode_si(storage_index))
|
||||
"/storage/v1/lease/{}".format(_encode_si(storage_index))
|
||||
)
|
||||
response = yield self._client.request(
|
||||
"PUT",
|
||||
@ -457,7 +457,9 @@ def read_share_chunk(
|
||||
always provided by the current callers.
|
||||
"""
|
||||
url = client.relative_url(
|
||||
"/v1/{}/{}/{}".format(share_type, _encode_si(storage_index), share_number)
|
||||
"/storage/v1/{}/{}/{}".format(
|
||||
share_type, _encode_si(storage_index), share_number
|
||||
)
|
||||
)
|
||||
response = yield client.request(
|
||||
"GET",
|
||||
@ -518,7 +520,7 @@ async def advise_corrupt_share(
|
||||
):
|
||||
assert isinstance(reason, str)
|
||||
url = client.relative_url(
|
||||
"/v1/{}/{}/{}/corrupt".format(
|
||||
"/storage/v1/{}/{}/{}/corrupt".format(
|
||||
share_type, _encode_si(storage_index), share_number
|
||||
)
|
||||
)
|
||||
@ -563,7 +565,9 @@ class StorageClientImmutables(object):
|
||||
Result fires when creating the storage index succeeded, if creating the
|
||||
storage index failed the result will fire with an exception.
|
||||
"""
|
||||
url = self._client.relative_url("/v1/immutable/" + _encode_si(storage_index))
|
||||
url = self._client.relative_url(
|
||||
"/storage/v1/immutable/" + _encode_si(storage_index)
|
||||
)
|
||||
message = {"share-numbers": share_numbers, "allocated-size": allocated_size}
|
||||
|
||||
response = yield self._client.request(
|
||||
@ -588,7 +592,9 @@ class StorageClientImmutables(object):
|
||||
) -> Deferred[None]:
|
||||
"""Abort the upload."""
|
||||
url = self._client.relative_url(
|
||||
"/v1/immutable/{}/{}/abort".format(_encode_si(storage_index), share_number)
|
||||
"/storage/v1/immutable/{}/{}/abort".format(
|
||||
_encode_si(storage_index), share_number
|
||||
)
|
||||
)
|
||||
response = yield self._client.request(
|
||||
"PUT",
|
||||
@ -620,7 +626,9 @@ class StorageClientImmutables(object):
|
||||
been uploaded.
|
||||
"""
|
||||
url = self._client.relative_url(
|
||||
"/v1/immutable/{}/{}".format(_encode_si(storage_index), share_number)
|
||||
"/storage/v1/immutable/{}/{}".format(
|
||||
_encode_si(storage_index), share_number
|
||||
)
|
||||
)
|
||||
response = yield self._client.request(
|
||||
"PATCH",
|
||||
@ -668,7 +676,7 @@ class StorageClientImmutables(object):
|
||||
Return the set of shares for a given storage index.
|
||||
"""
|
||||
url = self._client.relative_url(
|
||||
"/v1/immutable/{}/shares".format(_encode_si(storage_index))
|
||||
"/storage/v1/immutable/{}/shares".format(_encode_si(storage_index))
|
||||
)
|
||||
response = yield self._client.request(
|
||||
"GET",
|
||||
@ -774,7 +782,7 @@ class StorageClientMutables:
|
||||
are done and if they are valid the writes are done.
|
||||
"""
|
||||
url = self._client.relative_url(
|
||||
"/v1/mutable/{}/read-test-write".format(_encode_si(storage_index))
|
||||
"/storage/v1/mutable/{}/read-test-write".format(_encode_si(storage_index))
|
||||
)
|
||||
message = {
|
||||
"test-write-vectors": {
|
||||
@ -817,7 +825,7 @@ class StorageClientMutables:
|
||||
List the share numbers for a given storage index.
|
||||
"""
|
||||
url = self._client.relative_url(
|
||||
"/v1/mutable/{}/shares".format(_encode_si(storage_index))
|
||||
"/storage/v1/mutable/{}/shares".format(_encode_si(storage_index))
|
||||
)
|
||||
response = await self._client.request("GET", url)
|
||||
if response.code == http.OK:
|
||||
|
@ -551,7 +551,7 @@ class HTTPServer(object):
|
||||
|
||||
##### Generic APIs #####
|
||||
|
||||
@_authorized_route(_app, set(), "/v1/version", methods=["GET"])
|
||||
@_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"])
|
||||
def version(self, request, authorization):
|
||||
"""Return version information."""
|
||||
return self._send_encoded(request, self._storage_server.get_version())
|
||||
@ -561,7 +561,7 @@ class HTTPServer(object):
|
||||
@_authorized_route(
|
||||
_app,
|
||||
{Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.UPLOAD},
|
||||
"/v1/immutable/<storage_index:storage_index>",
|
||||
"/storage/v1/immutable/<storage_index:storage_index>",
|
||||
methods=["POST"],
|
||||
)
|
||||
def allocate_buckets(self, request, authorization, storage_index):
|
||||
@ -597,7 +597,7 @@ class HTTPServer(object):
|
||||
@_authorized_route(
|
||||
_app,
|
||||
{Secrets.UPLOAD},
|
||||
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/abort",
|
||||
"/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/abort",
|
||||
methods=["PUT"],
|
||||
)
|
||||
def abort_share_upload(self, request, authorization, storage_index, share_number):
|
||||
@ -628,7 +628,7 @@ class HTTPServer(object):
|
||||
@_authorized_route(
|
||||
_app,
|
||||
{Secrets.UPLOAD},
|
||||
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>",
|
||||
"/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>",
|
||||
methods=["PATCH"],
|
||||
)
|
||||
def write_share_data(self, request, authorization, storage_index, share_number):
|
||||
@ -671,7 +671,7 @@ class HTTPServer(object):
|
||||
@_authorized_route(
|
||||
_app,
|
||||
set(),
|
||||
"/v1/immutable/<storage_index:storage_index>/shares",
|
||||
"/storage/v1/immutable/<storage_index:storage_index>/shares",
|
||||
methods=["GET"],
|
||||
)
|
||||
def list_shares(self, request, authorization, storage_index):
|
||||
@ -684,7 +684,7 @@ class HTTPServer(object):
|
||||
@_authorized_route(
|
||||
_app,
|
||||
set(),
|
||||
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>",
|
||||
"/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>",
|
||||
methods=["GET"],
|
||||
)
|
||||
def read_share_chunk(self, request, authorization, storage_index, share_number):
|
||||
@ -700,7 +700,7 @@ class HTTPServer(object):
|
||||
@_authorized_route(
|
||||
_app,
|
||||
{Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL},
|
||||
"/v1/lease/<storage_index:storage_index>",
|
||||
"/storage/v1/lease/<storage_index:storage_index>",
|
||||
methods=["PUT"],
|
||||
)
|
||||
def add_or_renew_lease(self, request, authorization, storage_index):
|
||||
@ -721,7 +721,7 @@ class HTTPServer(object):
|
||||
@_authorized_route(
|
||||
_app,
|
||||
set(),
|
||||
"/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
|
||||
"/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
|
||||
methods=["POST"],
|
||||
)
|
||||
def advise_corrupt_share_immutable(
|
||||
@ -742,7 +742,7 @@ class HTTPServer(object):
|
||||
@_authorized_route(
|
||||
_app,
|
||||
{Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.WRITE_ENABLER},
|
||||
"/v1/mutable/<storage_index:storage_index>/read-test-write",
|
||||
"/storage/v1/mutable/<storage_index:storage_index>/read-test-write",
|
||||
methods=["POST"],
|
||||
)
|
||||
def mutable_read_test_write(self, request, authorization, storage_index):
|
||||
@ -777,7 +777,7 @@ class HTTPServer(object):
|
||||
@_authorized_route(
|
||||
_app,
|
||||
set(),
|
||||
"/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>",
|
||||
"/storage/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>",
|
||||
methods=["GET"],
|
||||
)
|
||||
def read_mutable_chunk(self, request, authorization, storage_index, share_number):
|
||||
@ -801,7 +801,10 @@ class HTTPServer(object):
|
||||
return read_range(request, read_data, share_length)
|
||||
|
||||
@_authorized_route(
|
||||
_app, set(), "/v1/mutable/<storage_index:storage_index>/shares", methods=["GET"]
|
||||
_app,
|
||||
set(),
|
||||
"/storage/v1/mutable/<storage_index:storage_index>/shares",
|
||||
methods=["GET"],
|
||||
)
|
||||
def enumerate_mutable_shares(self, request, authorization, storage_index):
|
||||
"""List mutable shares for a storage index."""
|
||||
@ -811,7 +814,7 @@ class HTTPServer(object):
|
||||
@_authorized_route(
|
||||
_app,
|
||||
set(),
|
||||
"/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
|
||||
"/storage/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
|
||||
methods=["POST"],
|
||||
)
|
||||
def advise_corrupt_share_mutable(
|
||||
|
@ -397,7 +397,9 @@ class BucketWriter(object):
|
||||
"""
|
||||
Write data at given offset, return whether the upload is complete.
|
||||
"""
|
||||
# Delay the timeout, since we received data:
|
||||
# Delay the timeout, since we received data; if we get an
|
||||
# AlreadyCancelled error, that means there's a bug in the client and
|
||||
# write() was called after close().
|
||||
self._timeout.reset(30 * 60)
|
||||
start = self._clock.seconds()
|
||||
precondition(not self.closed)
|
||||
@ -419,14 +421,18 @@ class BucketWriter(object):
|
||||
self._already_written.set(True, offset, end)
|
||||
self.ss.add_latency("write", self._clock.seconds() - start)
|
||||
self.ss.count("write")
|
||||
return self._is_finished()
|
||||
|
||||
# Return whether the whole thing has been written. See
|
||||
# https://github.com/mlenzen/collections-extended/issues/169 and
|
||||
# https://github.com/mlenzen/collections-extended/issues/172 for why
|
||||
# it's done this way.
|
||||
def _is_finished(self):
|
||||
"""
|
||||
Return whether the whole thing has been written.
|
||||
"""
|
||||
return sum([mr.stop - mr.start for mr in self._already_written.ranges()]) == self._max_size
|
||||
|
||||
def close(self):
|
||||
# This can't actually be enabled, because it's not backwards compatible
|
||||
# with old Foolscap clients.
|
||||
# assert self._is_finished()
|
||||
precondition(not self.closed)
|
||||
self._timeout.cancel()
|
||||
start = self._clock.seconds()
|
||||
|
@ -12,23 +12,19 @@ from future.utils import PY2
|
||||
if PY2:
|
||||
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
|
||||
|
||||
import re
|
||||
from six.moves import (
|
||||
StringIO,
|
||||
)
|
||||
|
||||
from testtools import (
|
||||
skipIf,
|
||||
)
|
||||
from hypothesis.strategies import text
|
||||
from hypothesis import given, assume
|
||||
|
||||
from testtools.matchers import (
|
||||
Contains,
|
||||
Equals,
|
||||
HasLength,
|
||||
)
|
||||
|
||||
from twisted.python.runtime import (
|
||||
platform,
|
||||
)
|
||||
from twisted.python.filepath import (
|
||||
FilePath,
|
||||
)
|
||||
@ -44,6 +40,10 @@ from ...scripts.tahoe_run import (
|
||||
RunOptions,
|
||||
run,
|
||||
)
|
||||
from ...util.pid import (
|
||||
check_pid_process,
|
||||
InvalidPidFile,
|
||||
)
|
||||
|
||||
from ...scripts.runner import (
|
||||
parse_options
|
||||
@ -151,7 +151,7 @@ class RunTests(SyncTestCase):
|
||||
"""
|
||||
Tests for ``run``.
|
||||
"""
|
||||
@skipIf(platform.isWindows(), "There are no PID files on Windows.")
|
||||
|
||||
def test_non_numeric_pid(self):
|
||||
"""
|
||||
If the pidfile exists but does not contain a numeric value, a complaint to
|
||||
@ -159,7 +159,7 @@ class RunTests(SyncTestCase):
|
||||
"""
|
||||
basedir = FilePath(self.mktemp()).asTextMode()
|
||||
basedir.makedirs()
|
||||
basedir.child(u"twistd.pid").setContent(b"foo")
|
||||
basedir.child(u"running.process").setContent(b"foo")
|
||||
basedir.child(u"tahoe-client.tac").setContent(b"")
|
||||
|
||||
config = RunOptions()
|
||||
@ -168,17 +168,30 @@ class RunTests(SyncTestCase):
|
||||
config['basedir'] = basedir.path
|
||||
config.twistd_args = []
|
||||
|
||||
reactor = MemoryReactor()
|
||||
|
||||
runs = []
|
||||
result_code = run(config, runApp=runs.append)
|
||||
result_code = run(reactor, config, runApp=runs.append)
|
||||
self.assertThat(
|
||||
config.stderr.getvalue(),
|
||||
Contains("found invalid PID file in"),
|
||||
)
|
||||
self.assertThat(
|
||||
runs,
|
||||
HasLength(1),
|
||||
)
|
||||
self.assertThat(
|
||||
result_code,
|
||||
Equals(0),
|
||||
)
|
||||
# because the pidfile is invalid we shouldn't get to the
|
||||
# .run() call itself.
|
||||
self.assertThat(runs, Equals([]))
|
||||
self.assertThat(result_code, Equals(1))
|
||||
|
||||
good_file_content_re = re.compile(r"\w[0-9]*\w[0-9]*\w")
|
||||
|
||||
@given(text())
|
||||
def test_pidfile_contents(self, content):
|
||||
"""
|
||||
invalid contents for a pidfile raise errors
|
||||
"""
|
||||
assume(not self.good_file_content_re.match(content))
|
||||
pidfile = FilePath("pidfile")
|
||||
pidfile.setContent(content.encode("utf8"))
|
||||
|
||||
with self.assertRaises(InvalidPidFile):
|
||||
with check_pid_process(pidfile):
|
||||
pass
|
||||
|
@ -134,7 +134,7 @@ class CLINodeAPI(object):
|
||||
|
||||
@property
|
||||
def twistd_pid_file(self):
|
||||
return self.basedir.child(u"twistd.pid")
|
||||
return self.basedir.child(u"running.process")
|
||||
|
||||
@property
|
||||
def node_url_file(self):
|
||||
|
@ -145,6 +145,7 @@ def run_cli_native(verb, *args, **kwargs):
|
||||
)
|
||||
d.addCallback(
|
||||
runner.dispatch,
|
||||
reactor,
|
||||
stdin=stdin,
|
||||
stdout=stdout,
|
||||
stderr=stderr,
|
||||
|
@ -251,6 +251,12 @@ class Verifier(GridTestMixin, unittest.TestCase, RepairTestMixin):
|
||||
self.judge_invisible_corruption)
|
||||
|
||||
def test_corrupt_ueb(self):
|
||||
# Note that in some rare situations this might fail, specifically if
|
||||
# the length of the UEB is corrupted to be a value that is bigger than
|
||||
# the size but less than 2000, it might not get caught... But that's
|
||||
# mostly because in that case it doesn't meaningfully corrupt it. See
|
||||
# _get_uri_extension_the_old_way() in layout.py for where the 2000
|
||||
# number comes from.
|
||||
self.basedir = "repairer/Verifier/corrupt_ueb"
|
||||
return self._help_test_verify(common._corrupt_uri_extension,
|
||||
self.judge_invisible_corruption)
|
||||
|
@ -42,16 +42,19 @@ from twisted.trial import unittest
|
||||
|
||||
from twisted.internet import reactor
|
||||
from twisted.python import usage
|
||||
from twisted.python.runtime import platform
|
||||
from twisted.internet.defer import (
|
||||
inlineCallbacks,
|
||||
DeferredList,
|
||||
)
|
||||
from twisted.python.filepath import FilePath
|
||||
from twisted.python.runtime import (
|
||||
platform,
|
||||
)
|
||||
from allmydata.util import fileutil, pollmixin
|
||||
from allmydata.util.encodingutil import unicode_to_argv
|
||||
from allmydata.util.pid import (
|
||||
check_pid_process,
|
||||
_pidfile_to_lockpath,
|
||||
ProcessInTheWay,
|
||||
)
|
||||
from allmydata.test import common_util
|
||||
import allmydata
|
||||
from allmydata.scripts.runner import (
|
||||
@ -418,9 +421,7 @@ class RunNode(common_util.SignalMixin, unittest.TestCase, pollmixin.PollMixin):
|
||||
|
||||
tahoe.active()
|
||||
|
||||
# We don't keep track of PIDs in files on Windows.
|
||||
if not platform.isWindows():
|
||||
self.assertTrue(tahoe.twistd_pid_file.exists())
|
||||
self.assertTrue(tahoe.twistd_pid_file.exists())
|
||||
self.assertTrue(tahoe.node_url_file.exists())
|
||||
|
||||
# rm this so we can detect when the second incarnation is ready
|
||||
@ -493,9 +494,7 @@ class RunNode(common_util.SignalMixin, unittest.TestCase, pollmixin.PollMixin):
|
||||
# change on restart
|
||||
storage_furl = fileutil.read(tahoe.storage_furl_file.path)
|
||||
|
||||
# We don't keep track of PIDs in files on Windows.
|
||||
if not platform.isWindows():
|
||||
self.assertTrue(tahoe.twistd_pid_file.exists())
|
||||
self.assertTrue(tahoe.twistd_pid_file.exists())
|
||||
|
||||
# rm this so we can detect when the second incarnation is ready
|
||||
tahoe.node_url_file.remove()
|
||||
@ -513,22 +512,23 @@ class RunNode(common_util.SignalMixin, unittest.TestCase, pollmixin.PollMixin):
|
||||
fileutil.read(tahoe.storage_furl_file.path),
|
||||
)
|
||||
|
||||
if not platform.isWindows():
|
||||
self.assertTrue(
|
||||
tahoe.twistd_pid_file.exists(),
|
||||
"PID file ({}) didn't exist when we expected it to. "
|
||||
"These exist: {}".format(
|
||||
tahoe.twistd_pid_file,
|
||||
tahoe.twistd_pid_file.parent().listdir(),
|
||||
),
|
||||
)
|
||||
self.assertTrue(
|
||||
tahoe.twistd_pid_file.exists(),
|
||||
"PID file ({}) didn't exist when we expected it to. "
|
||||
"These exist: {}".format(
|
||||
tahoe.twistd_pid_file,
|
||||
tahoe.twistd_pid_file.parent().listdir(),
|
||||
),
|
||||
)
|
||||
yield tahoe.stop_and_wait()
|
||||
|
||||
# twistd.pid should be gone by now -- except on Windows, where
|
||||
# killing a subprocess immediately exits with no chance for
|
||||
# any shutdown code (that is, no Twisted shutdown hooks can
|
||||
# run).
|
||||
if not platform.isWindows():
|
||||
# twistd.pid should be gone by now.
|
||||
self.assertFalse(tahoe.twistd_pid_file.exists())
|
||||
|
||||
|
||||
def _remove(self, res, file):
|
||||
fileutil.remove(file)
|
||||
return res
|
||||
@ -610,8 +610,9 @@ class RunNode(common_util.SignalMixin, unittest.TestCase, pollmixin.PollMixin):
|
||||
),
|
||||
)
|
||||
|
||||
# It should not be running (but windows shutdown can't run
|
||||
# code so the PID file still exists there).
|
||||
if not platform.isWindows():
|
||||
# It should not be running.
|
||||
self.assertFalse(tahoe.twistd_pid_file.exists())
|
||||
|
||||
# Wait for the operation to *complete*. If we got this far it's
|
||||
@ -621,3 +622,42 @@ class RunNode(common_util.SignalMixin, unittest.TestCase, pollmixin.PollMixin):
|
||||
# What's left is a perfect indicator that the process has exited and
|
||||
# we won't get blamed for leaving the reactor dirty.
|
||||
yield client_running
|
||||
|
||||
|
||||
class PidFileLocking(SyncTestCase):
|
||||
"""
|
||||
Direct tests for allmydata.util.pid functions
|
||||
"""
|
||||
|
||||
def test_locking(self):
|
||||
"""
|
||||
Fail to create a pidfile if another process has the lock already.
|
||||
"""
|
||||
# this can't just be "our" process because the locking library
|
||||
# allows the same process to acquire a lock multiple times.
|
||||
pidfile = FilePath(self.mktemp())
|
||||
lockfile = _pidfile_to_lockpath(pidfile)
|
||||
|
||||
with open("other_lock.py", "w") as f:
|
||||
f.write(
|
||||
"\n".join([
|
||||
"import filelock, time, sys",
|
||||
"with filelock.FileLock(sys.argv[1], timeout=1):",
|
||||
" sys.stdout.write('.\\n')",
|
||||
" sys.stdout.flush()",
|
||||
" time.sleep(10)",
|
||||
])
|
||||
)
|
||||
proc = Popen(
|
||||
[sys.executable, "other_lock.py", lockfile.path],
|
||||
stdout=PIPE,
|
||||
stderr=PIPE,
|
||||
)
|
||||
# make sure our subprocess has had time to acquire the lock
|
||||
# for sure (from the "." it prints)
|
||||
proc.stdout.read(2)
|
||||
|
||||
# acquiring the same lock should fail; it is locked by the subprocess
|
||||
with self.assertRaises(ProcessInTheWay):
|
||||
check_pid_process(pidfile)
|
||||
proc.terminate()
|
||||
|
@ -463,7 +463,7 @@ class BucketProxy(unittest.TestCase):
|
||||
block_size=10,
|
||||
num_segments=5,
|
||||
num_share_hashes=3,
|
||||
uri_extension_size_max=500)
|
||||
uri_extension_size=500)
|
||||
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
|
||||
|
||||
def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
|
||||
@ -494,7 +494,7 @@ class BucketProxy(unittest.TestCase):
|
||||
block_size=25,
|
||||
num_segments=4,
|
||||
num_share_hashes=3,
|
||||
uri_extension_size_max=len(uri_extension))
|
||||
uri_extension_size=len(uri_extension))
|
||||
|
||||
d = bp.put_header()
|
||||
d.addCallback(lambda res: bp.put_block(0, b"a"*25))
|
||||
|
@ -255,7 +255,7 @@ class TestApp(object):
|
||||
else:
|
||||
return "BAD: {}".format(authorization)
|
||||
|
||||
@_authorized_route(_app, set(), "/v1/version", methods=["GET"])
|
||||
@_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"])
|
||||
def bad_version(self, request, authorization):
|
||||
"""Return version result that violates the expected schema."""
|
||||
request.setHeader("content-type", CBOR_MIME_TYPE)
|
||||
@ -534,7 +534,7 @@ class GenericHTTPAPITests(SyncTestCase):
|
||||
lease_secret = urandom(32)
|
||||
storage_index = urandom(16)
|
||||
url = self.http.client.relative_url(
|
||||
"/v1/immutable/" + _encode_si(storage_index)
|
||||
"/storage/v1/immutable/" + _encode_si(storage_index)
|
||||
)
|
||||
message = {"bad-message": "missing expected keys"}
|
||||
|
||||
@ -1418,7 +1418,7 @@ class SharedImmutableMutableTestsMixin:
|
||||
self.http.client.request(
|
||||
"GET",
|
||||
self.http.client.relative_url(
|
||||
"/v1/{}/{}/1".format(self.KIND, _encode_si(storage_index))
|
||||
"/storage/v1/{}/{}/1".format(self.KIND, _encode_si(storage_index))
|
||||
),
|
||||
)
|
||||
)
|
||||
@ -1441,7 +1441,7 @@ class SharedImmutableMutableTestsMixin:
|
||||
self.http.client.request(
|
||||
"GET",
|
||||
self.http.client.relative_url(
|
||||
"/v1/{}/{}/1".format(self.KIND, _encode_si(storage_index))
|
||||
"/storage/v1/{}/{}/1".format(self.KIND, _encode_si(storage_index))
|
||||
),
|
||||
headers=headers,
|
||||
)
|
||||
|
@ -983,7 +983,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||
num_segments = encoder.get_param("num_segments")
|
||||
d = selector.get_shareholders(broker, sh, storage_index,
|
||||
share_size, block_size, num_segments,
|
||||
10, 3, 4)
|
||||
10, 3, 4, encoder.get_uri_extension_size())
|
||||
def _have_shareholders(upload_trackers_and_already_servers):
|
||||
(upload_trackers, already_servers) = upload_trackers_and_already_servers
|
||||
assert servers_to_break <= len(upload_trackers)
|
||||
|
120
src/allmydata/util/pid.py
Normal file
120
src/allmydata/util/pid.py
Normal file
@ -0,0 +1,120 @@
|
||||
import psutil
|
||||
|
||||
# the docs are a little misleading, but this is either WindowsFileLock
|
||||
# or UnixFileLock depending upon the platform we're currently on
|
||||
from filelock import FileLock, Timeout
|
||||
|
||||
|
||||
class ProcessInTheWay(Exception):
|
||||
"""
|
||||
our pidfile points at a running process
|
||||
"""
|
||||
|
||||
|
||||
class InvalidPidFile(Exception):
|
||||
"""
|
||||
our pidfile isn't well-formed
|
||||
"""
|
||||
|
||||
|
||||
class CannotRemovePidFile(Exception):
|
||||
"""
|
||||
something went wrong removing the pidfile
|
||||
"""
|
||||
|
||||
|
||||
def _pidfile_to_lockpath(pidfile):
|
||||
"""
|
||||
internal helper.
|
||||
:returns FilePath: a path to use for file-locking the given pidfile
|
||||
"""
|
||||
return pidfile.sibling("{}.lock".format(pidfile.basename()))
|
||||
|
||||
|
||||
def parse_pidfile(pidfile):
|
||||
"""
|
||||
:param FilePath pidfile:
|
||||
:returns tuple: 2-tuple of pid, creation-time as int, float
|
||||
:raises InvalidPidFile: on error
|
||||
"""
|
||||
with pidfile.open("r") as f:
|
||||
content = f.read().decode("utf8").strip()
|
||||
try:
|
||||
pid, starttime = content.split()
|
||||
pid = int(pid)
|
||||
starttime = float(starttime)
|
||||
except ValueError:
|
||||
raise InvalidPidFile(
|
||||
"found invalid PID file in {}".format(
|
||||
pidfile
|
||||
)
|
||||
)
|
||||
return pid, starttime
|
||||
|
||||
|
||||
def check_pid_process(pidfile):
|
||||
"""
|
||||
If another instance appears to be running already, raise an
|
||||
exception. Otherwise, write our PID + start time to the pidfile
|
||||
and arrange to delete it upon exit.
|
||||
|
||||
:param FilePath pidfile: the file to read/write our PID from.
|
||||
|
||||
:raises ProcessInTheWay: if a running process exists at our PID
|
||||
"""
|
||||
lock_path = _pidfile_to_lockpath(pidfile)
|
||||
|
||||
try:
|
||||
# a short timeout is fine, this lock should only be active
|
||||
# while someone is reading or deleting the pidfile .. and
|
||||
# facilitates testing the locking itself.
|
||||
with FileLock(lock_path.path, timeout=2):
|
||||
# check if we have another instance running already
|
||||
if pidfile.exists():
|
||||
pid, starttime = parse_pidfile(pidfile)
|
||||
try:
|
||||
# if any other process is running at that PID, let the
|
||||
# user decide if this is another legitimate
|
||||
# instance. Automated programs may use the start-time to
|
||||
# help decide this (if the PID is merely recycled, the
|
||||
# start-time won't match).
|
||||
psutil.Process(pid)
|
||||
raise ProcessInTheWay(
|
||||
"A process is already running as PID {}".format(pid)
|
||||
)
|
||||
except psutil.NoSuchProcess:
|
||||
print(
|
||||
"'{pidpath}' refers to {pid} that isn't running".format(
|
||||
pidpath=pidfile.path,
|
||||
pid=pid,
|
||||
)
|
||||
)
|
||||
# nothing is running at that PID so it must be a stale file
|
||||
pidfile.remove()
|
||||
|
||||
# write our PID + start-time to the pid-file
|
||||
proc = psutil.Process()
|
||||
with pidfile.open("w") as f:
|
||||
f.write("{} {}\n".format(proc.pid, proc.create_time()).encode("utf8"))
|
||||
except Timeout:
|
||||
raise ProcessInTheWay(
|
||||
"Another process is still locking {}".format(pidfile.path)
|
||||
)
|
||||
|
||||
|
||||
def cleanup_pidfile(pidfile):
|
||||
"""
|
||||
Remove the pidfile specified (respecting locks). If anything at
|
||||
all goes wrong, `CannotRemovePidFile` is raised.
|
||||
"""
|
||||
lock_path = _pidfile_to_lockpath(pidfile)
|
||||
with FileLock(lock_path.path):
|
||||
try:
|
||||
pidfile.remove()
|
||||
except Exception as e:
|
||||
raise CannotRemovePidFile(
|
||||
"Couldn't remove '{pidfile}': {err}.".format(
|
||||
pidfile=pidfile.path,
|
||||
err=e,
|
||||
)
|
||||
)
|
Loading…
x
Reference in New Issue
Block a user