From 40d649b3b202114bc0ce46fccb194b662b4991e3 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 23 Oct 2023 09:44:11 -0400 Subject: [PATCH 1/7] Make another slowish operation non-blocking --- src/allmydata/mutable/retrieve.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index 41c87ac59..54ada2ca9 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -928,12 +928,20 @@ class Retrieve(object): reason, ) - - def _try_to_validate_privkey(self, enc_privkey, reader, server): + @deferredutil.async_to_deferred + async def _try_to_validate_privkey(self, enc_privkey, reader, server): node_writekey = self._node.get_writekey() - alleged_privkey_s = decrypt_privkey(node_writekey, enc_privkey) - alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s) - if alleged_writekey != node_writekey: + + def get_privkey(): + alleged_privkey_s = decrypt_privkey(node_writekey, enc_privkey) + alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s) + if alleged_writekey != node_writekey: + return None + privkey, _ = rsa.create_signing_keypair_from_string(alleged_privkey_s) + return privkey + + privkey = await defer_to_thread(get_privkey) + if privkey is None: self.log("invalid privkey from %s shnum %d" % (reader, reader.shnum), level=log.WEIRD, umid="YIw4tA") @@ -950,7 +958,6 @@ class Retrieve(object): # it's good self.log("got valid privkey from shnum %d on reader %s" % (reader.shnum, reader)) - privkey, _ = rsa.create_signing_keypair_from_string(alleged_privkey_s) self._node._populate_encprivkey(enc_privkey) self._node._populate_privkey(privkey) self._need_privkey = False From 101453cd56acba66817b0c8e7f25bd6c8889c53c Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 23 Oct 2023 09:57:32 -0400 Subject: [PATCH 2/7] Make operation non-blocking (assuming GIL is released) --- src/allmydata/mutable/filenode.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/allmydata/mutable/filenode.py b/src/allmydata/mutable/filenode.py index 00b31c52b..ede74d249 100644 --- a/src/allmydata/mutable/filenode.py +++ b/src/allmydata/mutable/filenode.py @@ -14,6 +14,7 @@ from allmydata.interfaces import IMutableFileNode, ICheckable, ICheckResults, \ IMutableFileVersion, IWriteable from allmydata.util import hashutil, log, consumer, deferredutil, mathutil from allmydata.util.assertutil import precondition +from allmydata.util.cputhreadpool import defer_to_thread from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI, \ WriteableMDMFFileURI, ReadonlyMDMFFileURI from allmydata.monitor import Monitor @@ -128,7 +129,8 @@ class MutableFileNode(object): return self - def create_with_keys(self, keypair, contents, + @deferredutil.async_to_deferred + async def create_with_keys(self, keypair, contents, version=SDMF_VERSION): """Call this to create a brand-new mutable file. It will create the shares, find homes for them, and upload the initial contents (created @@ -137,8 +139,8 @@ class MutableFileNode(object): use) when it completes. """ self._pubkey, self._privkey = keypair - self._writekey, self._encprivkey, self._fingerprint = derive_mutable_keys( - keypair, + self._writekey, self._encprivkey, self._fingerprint = await defer_to_thread( + derive_mutable_keys, keypair ) if version == MDMF_VERSION: self._uri = WriteableMDMFFileURI(self._writekey, self._fingerprint) @@ -149,7 +151,7 @@ class MutableFileNode(object): self._readkey = self._uri.readkey self._storage_index = self._uri.storage_index initial_contents = self._get_initial_contents(contents) - return self._upload(initial_contents, None) + return await self._upload(initial_contents, None) def _get_initial_contents(self, contents): if contents is None: From 0c2db2d5a87ffe2bd0693d239fad9d2b27057e2c Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 15 Nov 2023 15:53:25 -0500 Subject: [PATCH 3/7] Make sure FEC does some work --- benchmarks/conftest.py | 2 +- newsfragments/4072.feature | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 newsfragments/4072.feature diff --git a/benchmarks/conftest.py b/benchmarks/conftest.py index 926978a29..972d89b48 100644 --- a/benchmarks/conftest.py +++ b/benchmarks/conftest.py @@ -101,7 +101,7 @@ def client_node(request, grid, storage_nodes, number_of_nodes) -> Client: "client_node", needed=number_of_nodes, happy=number_of_nodes, - total=number_of_nodes, + total=number_of_nodes + 3, # Make sure FEC does some work ) ) print(f"Client node pid: {client_node.process.transport.pid}") diff --git a/newsfragments/4072.feature b/newsfragments/4072.feature new file mode 100644 index 000000000..3b0db7a02 --- /dev/null +++ b/newsfragments/4072.feature @@ -0,0 +1 @@ +Continued work to make Tahoe-LAFS take advantage of multiple CPUs. \ No newline at end of file From 4c03d931bd9f3fa0defb57dcdd5d7f41b4ae3a61 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 6 Dec 2023 15:56:58 -0500 Subject: [PATCH 4/7] Accept memoryview --- src/allmydata/crypto/aes.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/allmydata/crypto/aes.py b/src/allmydata/crypto/aes.py index ad7cfcba4..a67501eb0 100644 --- a/src/allmydata/crypto/aes.py +++ b/src/allmydata/crypto/aes.py @@ -87,8 +87,8 @@ def encrypt_data(encryptor, plaintext): """ _validate_cryptor(encryptor, encrypt=True) - if not isinstance(plaintext, six.binary_type): - raise ValueError('Plaintext must be bytes') + if not isinstance(plaintext, (six.binary_type, memoryview)): + raise ValueError(f'Plaintext must be bytes or memoryview: {type(plaintext)}') return encryptor.update(plaintext) @@ -126,8 +126,8 @@ def decrypt_data(decryptor, plaintext): """ _validate_cryptor(decryptor, encrypt=False) - if not isinstance(plaintext, six.binary_type): - raise ValueError('Plaintext must be bytes') + if not isinstance(plaintext, (six.binary_type, memoryview)): + raise ValueError(f'Plaintext must be bytes or memoryview: {type(plaintext)}') return decryptor.update(plaintext) From 9a1e73892e1b58cb6da34b205cf2438811ea9a82 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 6 Dec 2023 15:58:30 -0500 Subject: [PATCH 5/7] Run joining in a thread --- src/allmydata/mutable/retrieve.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index 54ada2ca9..b4db6a092 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -4,8 +4,9 @@ Ported to Python 3. from __future__ import annotations import time - +from io import BytesIO from itertools import count + from zope.interface import implementer from twisted.internet import defer from twisted.python import failure @@ -873,11 +874,26 @@ class Retrieve(object): shares = shares[:self._required_shares] self.log("decoding segment %d" % segnum) if segnum == self._num_segments - 1: - d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids) + d = self._tail_decoder.decode(shares, shareids) else: - d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids) - def _process(buffers): - segment = b"".join(buffers) + d = self._segment_decoder.decode(shares, shareids) + + # For larger shares, this can take a few milliseconds. As such, we want + # to unblock the event loop. Even if it doesn't release the GIL, if it + # really takes too long it will implicitly release it. + def _join(buffers): + f = BytesIO() + for b in buffers: + f.write(b) + return f.getbuffer() + + @deferredutil.async_to_deferred + async def _got_buffers(buffers): + return await defer_to_thread(_join, buffers) + + d.addCallback(_got_buffers) + + def _process(segment): self.log(format="now decoding segment %(segnum)s of %(numsegs)s", segnum=segnum, numsegs=self._num_segments, From 81a5ae6f461cb615bdc2ae118b6b5223fda6b3ba Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 6 Dec 2023 16:01:14 -0500 Subject: [PATCH 6/7] Simplify --- src/allmydata/mutable/retrieve.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index b4db6a092..45d7766ee 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -4,7 +4,6 @@ Ported to Python 3. from __future__ import annotations import time -from io import BytesIO from itertools import count from zope.interface import implementer @@ -879,17 +878,11 @@ class Retrieve(object): d = self._segment_decoder.decode(shares, shareids) # For larger shares, this can take a few milliseconds. As such, we want - # to unblock the event loop. Even if it doesn't release the GIL, if it - # really takes too long it will implicitly release it. - def _join(buffers): - f = BytesIO() - for b in buffers: - f.write(b) - return f.getbuffer() - + # to unblock the event loop. In newer Python b"".join() will release + # the GIL: https://github.com/python/cpython/issues/80232 @deferredutil.async_to_deferred async def _got_buffers(buffers): - return await defer_to_thread(_join, buffers) + return await defer_to_thread(lambda: b"".join(buffers)) d.addCallback(_got_buffers) From 2783bd8b7799f6c57a7100e8920bc5adf88c0a52 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 7 Dec 2023 16:39:30 -0500 Subject: [PATCH 7/7] Unnecessary maybeDeferred --- src/allmydata/immutable/downloader/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py index a1ef4b485..efa3e09eb 100644 --- a/src/allmydata/immutable/downloader/node.py +++ b/src/allmydata/immutable/downloader/node.py @@ -419,7 +419,7 @@ class DownloadNode(object): def process_blocks(self, segnum, blocks): start = now() - d = defer.maybeDeferred(self._decode_blocks, segnum, blocks) + d = self._decode_blocks(segnum, blocks) d.addCallback(self._check_ciphertext_hash, segnum) def _deliver(result): log.msg(format="delivering segment(%(segnum)d)",