mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-31 16:36:20 +00:00
Merge pull request #1251 from tahoe-lafs/3946-less-chatty-downloads
Make immutable downloads less chatty Fixes ticket:3946
This commit is contained in:
commit
0b0af62b16
@ -3,11 +3,14 @@ Integration tests for getting and putting files, including reading from stdin
|
|||||||
and stdout.
|
and stdout.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from subprocess import Popen, PIPE
|
from subprocess import Popen, PIPE, check_output
|
||||||
|
import sys
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from pytest_twisted import ensureDeferred
|
||||||
|
from twisted.internet import reactor
|
||||||
|
|
||||||
from .util import run_in_thread, cli
|
from .util import run_in_thread, cli, reconfigure
|
||||||
|
|
||||||
DATA = b"abc123 this is not utf-8 decodable \xff\x00\x33 \x11"
|
DATA = b"abc123 this is not utf-8 decodable \xff\x00\x33 \x11"
|
||||||
try:
|
try:
|
||||||
@ -62,3 +65,51 @@ def test_get_to_stdout(alice, get_put_alias, tmpdir):
|
|||||||
)
|
)
|
||||||
assert p.stdout.read() == DATA
|
assert p.stdout.read() == DATA
|
||||||
assert p.wait() == 0
|
assert p.wait() == 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(
|
||||||
|
sys.platform.startswith("win"),
|
||||||
|
reason="reconfigure() has issues on Windows"
|
||||||
|
)
|
||||||
|
@ensureDeferred
|
||||||
|
async def test_upload_download_immutable_different_default_max_segment_size(alice, get_put_alias, tmpdir, request):
|
||||||
|
"""
|
||||||
|
Tahoe-LAFS used to have a default max segment size of 128KB, and is now
|
||||||
|
1MB. Test that an upload created when 128KB was the default can be
|
||||||
|
downloaded with 1MB as the default (i.e. old uploader, new downloader), and
|
||||||
|
vice versa, (new uploader, old downloader).
|
||||||
|
"""
|
||||||
|
tempfile = tmpdir.join("file")
|
||||||
|
large_data = DATA * 100_000
|
||||||
|
assert len(large_data) > 2 * 1024 * 1024
|
||||||
|
with tempfile.open("wb") as f:
|
||||||
|
f.write(large_data)
|
||||||
|
|
||||||
|
async def set_segment_size(segment_size):
|
||||||
|
await reconfigure(
|
||||||
|
reactor,
|
||||||
|
request,
|
||||||
|
alice,
|
||||||
|
(1, 1, 1),
|
||||||
|
None,
|
||||||
|
max_segment_size=segment_size
|
||||||
|
)
|
||||||
|
|
||||||
|
# 1. Upload file 1 with default segment size set to 1MB
|
||||||
|
await set_segment_size(1024 * 1024)
|
||||||
|
cli(alice, "put", str(tempfile), "getput:seg1024kb")
|
||||||
|
|
||||||
|
# 2. Download file 1 with default segment size set to 128KB
|
||||||
|
await set_segment_size(128 * 1024)
|
||||||
|
assert large_data == check_output(
|
||||||
|
["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg1024kb", "-"]
|
||||||
|
)
|
||||||
|
|
||||||
|
# 3. Upload file 2 with default segment size set to 128KB
|
||||||
|
cli(alice, "put", str(tempfile), "getput:seg128kb")
|
||||||
|
|
||||||
|
# 4. Download file 2 with default segment size set to 1MB
|
||||||
|
await set_segment_size(1024 * 1024)
|
||||||
|
assert large_data == check_output(
|
||||||
|
["tahoe", "--node-directory", alice.node_dir, "get", "getput:seg128kb", "-"]
|
||||||
|
)
|
||||||
|
@ -36,7 +36,8 @@ async def test_capability(reactor, request, alice, case, expected):
|
|||||||
computed value.
|
computed value.
|
||||||
"""
|
"""
|
||||||
# rewrite alice's config to match params and convergence
|
# rewrite alice's config to match params and convergence
|
||||||
await reconfigure(reactor, request, alice, (1, case.params.required, case.params.total), case.convergence)
|
await reconfigure(
|
||||||
|
reactor, request, alice, (1, case.params.required, case.params.total), case.convergence, case.segment_size)
|
||||||
|
|
||||||
# upload data in the correct format
|
# upload data in the correct format
|
||||||
actual = upload(alice, case.fmt, case.data)
|
actual = upload(alice, case.fmt, case.data)
|
||||||
@ -110,7 +111,8 @@ async def generate(
|
|||||||
request,
|
request,
|
||||||
alice,
|
alice,
|
||||||
(happy, case.params.required, case.params.total),
|
(happy, case.params.required, case.params.total),
|
||||||
case.convergence
|
case.convergence,
|
||||||
|
case.segment_size
|
||||||
)
|
)
|
||||||
|
|
||||||
# Give the format a chance to make an RSA key if it needs it.
|
# Give the format a chance to make an RSA key if it needs it.
|
||||||
|
@ -46,6 +46,7 @@ from allmydata.util.configutil import (
|
|||||||
write_config,
|
write_config,
|
||||||
)
|
)
|
||||||
from allmydata import client
|
from allmydata import client
|
||||||
|
from allmydata.interfaces import DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE
|
||||||
|
|
||||||
import pytest_twisted
|
import pytest_twisted
|
||||||
|
|
||||||
@ -729,11 +730,16 @@ def upload(alice: TahoeProcess, fmt: CHK | SSK, data: bytes) -> str:
|
|||||||
return cli(*argv).decode("utf-8").strip()
|
return cli(*argv).decode("utf-8").strip()
|
||||||
|
|
||||||
|
|
||||||
async def reconfigure(reactor, request, node: TahoeProcess, params: tuple[int, int, int], convergence: None | bytes) -> None:
|
async def reconfigure(reactor, request, node: TahoeProcess,
|
||||||
|
params: tuple[int, int, int],
|
||||||
|
convergence: None | bytes,
|
||||||
|
max_segment_size: None | int = None) -> None:
|
||||||
"""
|
"""
|
||||||
Reconfigure a Tahoe-LAFS node with different ZFEC parameters and
|
Reconfigure a Tahoe-LAFS node with different ZFEC parameters and
|
||||||
convergence secret.
|
convergence secret.
|
||||||
|
|
||||||
|
TODO This appears to have issues on Windows.
|
||||||
|
|
||||||
If the current configuration is different from the specified
|
If the current configuration is different from the specified
|
||||||
configuration, the node will be restarted so it takes effect.
|
configuration, the node will be restarted so it takes effect.
|
||||||
|
|
||||||
@ -769,7 +775,22 @@ async def reconfigure(reactor, request, node: TahoeProcess, params: tuple[int, i
|
|||||||
changed = True
|
changed = True
|
||||||
config.write_private_config("convergence", base32.b2a(convergence))
|
config.write_private_config("convergence", base32.b2a(convergence))
|
||||||
|
|
||||||
|
if max_segment_size is not None:
|
||||||
|
cur_segment_size = int(config.get_config("client", "shares._max_immutable_segment_size_for_testing", DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE))
|
||||||
|
if cur_segment_size != max_segment_size:
|
||||||
|
changed = True
|
||||||
|
config.set_config(
|
||||||
|
"client",
|
||||||
|
"shares._max_immutable_segment_size_for_testing",
|
||||||
|
str(max_segment_size)
|
||||||
|
)
|
||||||
|
|
||||||
if changed:
|
if changed:
|
||||||
|
# TODO reconfigure() seems to have issues on Windows. If you need to
|
||||||
|
# use it there, delete this assert and try to figure out what's going
|
||||||
|
# on...
|
||||||
|
assert not sys.platform.startswith("win")
|
||||||
|
|
||||||
# restart the node
|
# restart the node
|
||||||
print(f"Restarting {node.node_dir} for ZFEC reconfiguration")
|
print(f"Restarting {node.node_dir} for ZFEC reconfiguration")
|
||||||
await node.restart_async(reactor, request)
|
await node.restart_async(reactor, request)
|
||||||
|
@ -5,7 +5,7 @@ from __future__ import print_function
|
|||||||
import sys, math
|
import sys, math
|
||||||
from allmydata import uri, storage
|
from allmydata import uri, storage
|
||||||
from allmydata.immutable import upload
|
from allmydata.immutable import upload
|
||||||
from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE
|
from allmydata.interfaces import DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE
|
||||||
from allmydata.util import mathutil
|
from allmydata.util import mathutil
|
||||||
|
|
||||||
def roundup(size, blocksize=4096):
|
def roundup(size, blocksize=4096):
|
||||||
@ -26,7 +26,7 @@ class BigFakeString(object):
|
|||||||
def tell(self):
|
def tell(self):
|
||||||
return self.fp
|
return self.fp
|
||||||
|
|
||||||
def calc(filesize, params=(3,7,10), segsize=DEFAULT_MAX_SEGMENT_SIZE):
|
def calc(filesize, params=(3,7,10), segsize=DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE):
|
||||||
num_shares = params[2]
|
num_shares = params[2]
|
||||||
if filesize <= upload.Uploader.URI_LIT_SIZE_THRESHOLD:
|
if filesize <= upload.Uploader.URI_LIT_SIZE_THRESHOLD:
|
||||||
urisize = len(uri.LiteralFileURI("A"*filesize).to_string())
|
urisize = len(uri.LiteralFileURI("A"*filesize).to_string())
|
||||||
|
1
newsfragments/3946.bugfix
Normal file
1
newsfragments/3946.bugfix
Normal file
@ -0,0 +1 @@
|
|||||||
|
Downloads of large immutables should now finish much faster.
|
@ -50,7 +50,7 @@ from allmydata.interfaces import (
|
|||||||
IStatsProducer,
|
IStatsProducer,
|
||||||
SDMF_VERSION,
|
SDMF_VERSION,
|
||||||
MDMF_VERSION,
|
MDMF_VERSION,
|
||||||
DEFAULT_MAX_SEGMENT_SIZE,
|
DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE,
|
||||||
IFoolscapStoragePlugin,
|
IFoolscapStoragePlugin,
|
||||||
IAnnounceableStorageServer,
|
IAnnounceableStorageServer,
|
||||||
)
|
)
|
||||||
@ -88,6 +88,7 @@ _client_config = configutil.ValidConfiguration(
|
|||||||
"shares.happy",
|
"shares.happy",
|
||||||
"shares.needed",
|
"shares.needed",
|
||||||
"shares.total",
|
"shares.total",
|
||||||
|
"shares._max_immutable_segment_size_for_testing",
|
||||||
"storage.plugins",
|
"storage.plugins",
|
||||||
),
|
),
|
||||||
"storage": (
|
"storage": (
|
||||||
@ -606,7 +607,7 @@ class _Client(node.Node, pollmixin.PollMixin):
|
|||||||
DEFAULT_ENCODING_PARAMETERS = {"k": 3,
|
DEFAULT_ENCODING_PARAMETERS = {"k": 3,
|
||||||
"happy": 7,
|
"happy": 7,
|
||||||
"n": 10,
|
"n": 10,
|
||||||
"max_segment_size": DEFAULT_MAX_SEGMENT_SIZE,
|
"max_segment_size": DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE,
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, config, main_tub, i2p_provider, tor_provider, introducer_clients,
|
def __init__(self, config, main_tub, i2p_provider, tor_provider, introducer_clients,
|
||||||
@ -896,6 +897,13 @@ class _Client(node.Node, pollmixin.PollMixin):
|
|||||||
DEP["k"] = int(self.config.get_config("client", "shares.needed", DEP["k"]))
|
DEP["k"] = int(self.config.get_config("client", "shares.needed", DEP["k"]))
|
||||||
DEP["n"] = int(self.config.get_config("client", "shares.total", DEP["n"]))
|
DEP["n"] = int(self.config.get_config("client", "shares.total", DEP["n"]))
|
||||||
DEP["happy"] = int(self.config.get_config("client", "shares.happy", DEP["happy"]))
|
DEP["happy"] = int(self.config.get_config("client", "shares.happy", DEP["happy"]))
|
||||||
|
# At the moment this is only used for testing, thus the janky config
|
||||||
|
# attribute name.
|
||||||
|
DEP["max_segment_size"] = int(self.config.get_config(
|
||||||
|
"client",
|
||||||
|
"shares._max_immutable_segment_size_for_testing",
|
||||||
|
DEP["max_segment_size"])
|
||||||
|
)
|
||||||
|
|
||||||
# for the CLI to authenticate to local JSON endpoints
|
# for the CLI to authenticate to local JSON endpoints
|
||||||
self._create_auth_token()
|
self._create_auth_token()
|
||||||
|
@ -19,7 +19,7 @@ from foolscap.api import eventually
|
|||||||
from allmydata import uri
|
from allmydata import uri
|
||||||
from allmydata.codec import CRSDecoder
|
from allmydata.codec import CRSDecoder
|
||||||
from allmydata.util import base32, log, hashutil, mathutil, observer
|
from allmydata.util import base32, log, hashutil, mathutil, observer
|
||||||
from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE
|
from allmydata.interfaces import DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE
|
||||||
from allmydata.hashtree import IncompleteHashTree, BadHashError, \
|
from allmydata.hashtree import IncompleteHashTree, BadHashError, \
|
||||||
NotEnoughHashesError
|
NotEnoughHashesError
|
||||||
|
|
||||||
@ -49,6 +49,8 @@ class DownloadNode(object):
|
|||||||
"""Internal class which manages downloads and holds state. External
|
"""Internal class which manages downloads and holds state. External
|
||||||
callers use CiphertextFileNode instead."""
|
callers use CiphertextFileNode instead."""
|
||||||
|
|
||||||
|
default_max_segment_size = DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE
|
||||||
|
|
||||||
# Share._node points to me
|
# Share._node points to me
|
||||||
def __init__(self, verifycap, storage_broker, secret_holder,
|
def __init__(self, verifycap, storage_broker, secret_holder,
|
||||||
terminator, history, download_status):
|
terminator, history, download_status):
|
||||||
@ -76,7 +78,7 @@ class DownloadNode(object):
|
|||||||
# .guessed_segment_size, .guessed_num_segments, and
|
# .guessed_segment_size, .guessed_num_segments, and
|
||||||
# .ciphertext_hash_tree (with a dummy, to let us guess which hashes
|
# .ciphertext_hash_tree (with a dummy, to let us guess which hashes
|
||||||
# we'll need)
|
# we'll need)
|
||||||
self._build_guessed_tables(DEFAULT_MAX_SEGMENT_SIZE)
|
self._build_guessed_tables(self.default_max_segment_size)
|
||||||
|
|
||||||
# filled in when we parse a valid UEB
|
# filled in when we parse a valid UEB
|
||||||
self.have_UEB = False
|
self.have_UEB = False
|
||||||
|
@ -48,7 +48,7 @@ from allmydata.util.rrefutil import add_version_to_remote_reference
|
|||||||
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
|
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
|
||||||
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
|
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
|
||||||
NoServersError, InsufficientVersionError, UploadUnhappinessError, \
|
NoServersError, InsufficientVersionError, UploadUnhappinessError, \
|
||||||
DEFAULT_MAX_SEGMENT_SIZE, IPeerSelector
|
DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE, IPeerSelector
|
||||||
from allmydata.immutable import layout
|
from allmydata.immutable import layout
|
||||||
|
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
@ -1692,7 +1692,7 @@ class AssistedUploader(object):
|
|||||||
|
|
||||||
class BaseUploadable(object):
|
class BaseUploadable(object):
|
||||||
# this is overridden by max_segment_size
|
# this is overridden by max_segment_size
|
||||||
default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
|
default_max_segment_size = DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE
|
||||||
default_params_set = False
|
default_params_set = False
|
||||||
|
|
||||||
max_segment_size = None
|
max_segment_size = None
|
||||||
|
@ -41,7 +41,8 @@ URI = StringConstraint(300) # kind of arbitrary
|
|||||||
|
|
||||||
MAX_BUCKETS = 256 # per peer -- zfec offers at most 256 shares per file
|
MAX_BUCKETS = 256 # per peer -- zfec offers at most 256 shares per file
|
||||||
|
|
||||||
DEFAULT_MAX_SEGMENT_SIZE = 128*1024
|
# The default size for segments of new CHK ("immutable") uploads.
|
||||||
|
DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE = 1024*1024
|
||||||
|
|
||||||
ShareData = StringConstraint(None)
|
ShareData = StringConstraint(None)
|
||||||
URIExtensionData = StringConstraint(1000)
|
URIExtensionData = StringConstraint(1000)
|
||||||
|
@ -36,7 +36,7 @@ from allmydata.mutable.layout import get_version_from_checkstring,\
|
|||||||
SDMFSlotWriteProxy
|
SDMFSlotWriteProxy
|
||||||
|
|
||||||
KiB = 1024
|
KiB = 1024
|
||||||
DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
|
DEFAULT_MUTABLE_MAX_SEGMENT_SIZE = 128 * KiB
|
||||||
PUSHING_BLOCKS_STATE = 0
|
PUSHING_BLOCKS_STATE = 0
|
||||||
PUSHING_EVERYTHING_ELSE_STATE = 1
|
PUSHING_EVERYTHING_ELSE_STATE = 1
|
||||||
DONE_STATE = 2
|
DONE_STATE = 2
|
||||||
@ -367,7 +367,7 @@ class Publish(object):
|
|||||||
|
|
||||||
self.data = newdata
|
self.data = newdata
|
||||||
self.datalength = newdata.get_size()
|
self.datalength = newdata.get_size()
|
||||||
#if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
|
#if self.datalength >= DEFAULT_MUTABLE_MAX_SEGMENT_SIZE:
|
||||||
# self._version = MDMF_VERSION
|
# self._version = MDMF_VERSION
|
||||||
#else:
|
#else:
|
||||||
# self._version = SDMF_VERSION
|
# self._version = SDMF_VERSION
|
||||||
@ -551,7 +551,7 @@ class Publish(object):
|
|||||||
|
|
||||||
def setup_encoding_parameters(self, offset=0):
|
def setup_encoding_parameters(self, offset=0):
|
||||||
if self._version == MDMF_VERSION:
|
if self._version == MDMF_VERSION:
|
||||||
segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
|
segment_size = DEFAULT_MUTABLE_MAX_SEGMENT_SIZE # 128 KiB by default
|
||||||
else:
|
else:
|
||||||
segment_size = self.datalength # SDMF is only one segment
|
segment_size = self.datalength # SDMF is only one segment
|
||||||
# this must be a multiple of self.required_shares
|
# this must be a multiple of self.required_shares
|
||||||
|
@ -20,7 +20,7 @@ from testtools.matchers import (
|
|||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from allmydata.interfaces import MDMF_VERSION
|
from allmydata.interfaces import MDMF_VERSION
|
||||||
from allmydata.mutable.filenode import MutableFileNode
|
from allmydata.mutable.filenode import MutableFileNode
|
||||||
from allmydata.mutable.publish import MutableData, DEFAULT_MAX_SEGMENT_SIZE
|
from allmydata.mutable.publish import MutableData, DEFAULT_MUTABLE_MAX_SEGMENT_SIZE
|
||||||
from ..no_network import GridTestMixin
|
from ..no_network import GridTestMixin
|
||||||
from .. import common_util as testutil
|
from .. import common_util as testutil
|
||||||
|
|
||||||
@ -180,7 +180,7 @@ class Update(GridTestMixin, AsyncTestCase, testutil.ShouldFailMixin):
|
|||||||
# long -- this is 7 segments in the default segment size. So we
|
# long -- this is 7 segments in the default segment size. So we
|
||||||
# need to add 2 segments worth of data to push it over a
|
# need to add 2 segments worth of data to push it over a
|
||||||
# power-of-two boundary.
|
# power-of-two boundary.
|
||||||
segment = b"a" * DEFAULT_MAX_SEGMENT_SIZE
|
segment = b"a" * DEFAULT_MUTABLE_MAX_SEGMENT_SIZE
|
||||||
new_data = self.data + (segment * 2)
|
new_data = self.data + (segment * 2)
|
||||||
d0 = self.do_upload_mdmf()
|
d0 = self.do_upload_mdmf()
|
||||||
def _run(ign):
|
def _run(ign):
|
||||||
@ -232,9 +232,9 @@ class Update(GridTestMixin, AsyncTestCase, testutil.ShouldFailMixin):
|
|||||||
return d0
|
return d0
|
||||||
|
|
||||||
def test_multiple_segment_replace(self):
|
def test_multiple_segment_replace(self):
|
||||||
replace_offset = 2 * DEFAULT_MAX_SEGMENT_SIZE
|
replace_offset = 2 * DEFAULT_MUTABLE_MAX_SEGMENT_SIZE
|
||||||
new_data = self.data[:replace_offset]
|
new_data = self.data[:replace_offset]
|
||||||
new_segment = b"a" * DEFAULT_MAX_SEGMENT_SIZE
|
new_segment = b"a" * DEFAULT_MUTABLE_MAX_SEGMENT_SIZE
|
||||||
new_data += 2 * new_segment
|
new_data += 2 * new_segment
|
||||||
new_data += b"replaced"
|
new_data += b"replaced"
|
||||||
rest_offset = len(new_data)
|
rest_offset = len(new_data)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user