mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-13 22:03:04 +00:00
Merge pull request #1130 from tahoe-lafs/3801-no-overlapping-writes-immutable-upload
Disallow conflicting overlapping writes when doing an immutable upload. Fixes ticket:3801
This commit is contained in:
commit
fadfbcad99
@ -497,11 +497,8 @@ If any one of these requests fails then at most 128KiB of upload work needs to b
|
||||
|
||||
The server must recognize when all of the data has been received and mark the share as complete
|
||||
(which it can do because it was informed of the size when the storage index was initialized).
|
||||
Clients should upload chunks in re-assembly order.
|
||||
|
||||
* When a chunk that does not complete the share is successfully uploaded the response is ``OK``.
|
||||
* When the chunk that completes the share is successfully uploaded the response is ``CREATED``.
|
||||
* If the *Content-Range* for a request covers part of the share that has already been uploaded the response is ``CONFLICT``.
|
||||
The response body indicates the range of share data that has yet to be uploaded.
|
||||
That is::
|
||||
|
||||
@ -514,6 +511,25 @@ Clients should upload chunks in re-assembly order.
|
||||
]
|
||||
}
|
||||
|
||||
* When the chunk that completes the share is successfully uploaded the response is ``CREATED``.
|
||||
* If the *Content-Range* for a request covers part of the share that has already,
|
||||
and the data does not match already written data,
|
||||
the response is ``CONFLICT``.
|
||||
At this point the only thing to do is abort the upload and start from scratch (see below).
|
||||
|
||||
``PUT /v1/immutable/:storage_index/:share_number/abort``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
||||
This cancels an *in-progress* upload.
|
||||
|
||||
The response code:
|
||||
|
||||
* When the upload is still in progress and therefore the abort has succeeded,
|
||||
the response is ``OK``.
|
||||
Future uploads can start from scratch with no pre-existing upload state stored on the server.
|
||||
* If the uploaded has already finished, the response is 405 (Method Not Allowed)
|
||||
and no change is made.
|
||||
|
||||
|
||||
``POST /v1/immutable/:storage_index/:share_number/corrupt``
|
||||
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
|
1
newsfragments/3801.bugfix
Normal file
1
newsfragments/3801.bugfix
Normal file
@ -0,0 +1 @@
|
||||
When uploading an immutable, overlapping writes that include conflicting data are rejected. In practice, this likely didn't happen in real-world usage.
|
19
nix/collections-extended.nix
Normal file
19
nix/collections-extended.nix
Normal file
@ -0,0 +1,19 @@
|
||||
{ lib, buildPythonPackage, fetchPypi }:
|
||||
buildPythonPackage rec {
|
||||
pname = "collections-extended";
|
||||
version = "1.0.3";
|
||||
|
||||
src = fetchPypi {
|
||||
inherit pname version;
|
||||
sha256 = "0lb69x23asd68n0dgw6lzxfclavrp2764xsnh45jm97njdplznkw";
|
||||
};
|
||||
|
||||
# Tests aren't in tarball, for 1.0.3 at least.
|
||||
doCheck = false;
|
||||
|
||||
meta = with lib; {
|
||||
homepage = https://github.com/mlenzen/collections-extended;
|
||||
description = "Extra Python Collections - bags (multisets), setlists (unique list / indexed set), RangeMap and IndexedDict";
|
||||
license = licenses.asl20;
|
||||
};
|
||||
}
|
@ -18,6 +18,9 @@ self: super: {
|
||||
|
||||
# Need a newer version of Twisted, too.
|
||||
twisted = python-super.callPackage ./twisted.nix { };
|
||||
|
||||
# collections-extended is not part of nixpkgs at this time.
|
||||
collections-extended = python-super.callPackage ./collections-extended.nix { };
|
||||
};
|
||||
};
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ EOF
|
||||
setuptoolsTrial pyasn1 zope_interface
|
||||
service-identity pyyaml magic-wormhole treq
|
||||
eliot autobahn cryptography netifaces setuptools
|
||||
future pyutil distro configparser
|
||||
future pyutil distro configparser collections-extended
|
||||
];
|
||||
|
||||
checkInputs = with python.pkgs; [
|
||||
|
3
setup.py
3
setup.py
@ -137,6 +137,9 @@ install_requires = [
|
||||
|
||||
# Backported configparser for Python 2:
|
||||
"configparser ; python_version < '3.0'",
|
||||
|
||||
# For the RangeMap datastructure.
|
||||
"collections-extended",
|
||||
]
|
||||
|
||||
setup_requires = [
|
||||
|
@ -53,6 +53,14 @@ LeaseRenewSecret = Hash # used to protect lease renewal requests
|
||||
LeaseCancelSecret = Hash # was used to protect lease cancellation requests
|
||||
|
||||
|
||||
class DataTooLargeError(Exception):
|
||||
"""The write went past the expected size of the bucket."""
|
||||
|
||||
|
||||
class ConflictingWriteError(Exception):
|
||||
"""Two writes happened to same immutable with different data."""
|
||||
|
||||
|
||||
class RIBucketWriter(RemoteInterface):
|
||||
""" Objects of this kind live on the server side. """
|
||||
def write(offset=Offset, data=ShareData):
|
||||
|
@ -13,8 +13,9 @@ if PY2:
|
||||
import os.path
|
||||
from allmydata.util import base32
|
||||
|
||||
class DataTooLargeError(Exception):
|
||||
pass
|
||||
# Backwards compatibility.
|
||||
from allmydata.interfaces import DataTooLargeError # noqa: F401
|
||||
|
||||
class UnknownMutableContainerVersionError(Exception):
|
||||
pass
|
||||
class UnknownImmutableContainerVersionError(Exception):
|
||||
|
@ -13,16 +13,20 @@ if PY2:
|
||||
|
||||
import os, stat, struct, time
|
||||
|
||||
from collections_extended import RangeMap
|
||||
|
||||
from foolscap.api import Referenceable
|
||||
|
||||
from zope.interface import implementer
|
||||
from allmydata.interfaces import RIBucketWriter, RIBucketReader
|
||||
from allmydata.interfaces import (
|
||||
RIBucketWriter, RIBucketReader, ConflictingWriteError,
|
||||
DataTooLargeError,
|
||||
)
|
||||
from allmydata.util import base32, fileutil, log
|
||||
from allmydata.util.assertutil import precondition
|
||||
from allmydata.util.hashutil import timing_safe_compare
|
||||
from allmydata.storage.lease import LeaseInfo
|
||||
from allmydata.storage.common import UnknownImmutableContainerVersionError, \
|
||||
DataTooLargeError
|
||||
from allmydata.storage.common import UnknownImmutableContainerVersionError
|
||||
|
||||
# each share file (in storage/shares/$SI/$SHNUM) contains lease information
|
||||
# and share data. The share data is accessed by RIBucketWriter.write and
|
||||
@ -217,6 +221,7 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
|
||||
# also, add our lease to the file now, so that other ones can be
|
||||
# added by simultaneous uploaders
|
||||
self._sharefile.add_lease(lease_info)
|
||||
self._already_written = RangeMap()
|
||||
|
||||
def allocated_size(self):
|
||||
return self._max_size
|
||||
@ -226,7 +231,20 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
|
||||
precondition(not self.closed)
|
||||
if self.throw_out_all_data:
|
||||
return
|
||||
|
||||
# Make sure we're not conflicting with existing data:
|
||||
end = offset + len(data)
|
||||
for (chunk_start, chunk_stop, _) in self._already_written.ranges(offset, end):
|
||||
chunk_len = chunk_stop - chunk_start
|
||||
actual_chunk = self._sharefile.read_share_data(chunk_start, chunk_len)
|
||||
writing_chunk = data[chunk_start - offset:chunk_stop - offset]
|
||||
if actual_chunk != writing_chunk:
|
||||
raise ConflictingWriteError(
|
||||
"Chunk {}-{} doesn't match already written data.".format(chunk_start, chunk_stop)
|
||||
)
|
||||
self._sharefile.write_share_data(offset, data)
|
||||
|
||||
self._already_written.set(True, offset, end)
|
||||
self.ss.add_latency("write", time.time() - start)
|
||||
self.ss.count("write")
|
||||
|
||||
|
@ -24,7 +24,7 @@ from testtools import skipIf
|
||||
|
||||
from twisted.internet.defer import inlineCallbacks
|
||||
|
||||
from foolscap.api import Referenceable
|
||||
from foolscap.api import Referenceable, RemoteException
|
||||
|
||||
from allmydata.interfaces import IStorageServer
|
||||
from .common_system import SystemTestMixin
|
||||
@ -248,12 +248,60 @@ class IStorageServerImmutableAPIsTestsMixin(object):
|
||||
(yield buckets[2].callRemote("read", 0, 1024)), b"3" * 512 + b"4" * 512
|
||||
)
|
||||
|
||||
@skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3801")
|
||||
def test_overlapping_writes(self):
|
||||
@inlineCallbacks
|
||||
def test_non_matching_overlapping_writes(self):
|
||||
"""
|
||||
The policy for overlapping writes is TBD:
|
||||
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3801
|
||||
When doing overlapping writes in immutable uploads, non-matching writes
|
||||
fail.
|
||||
"""
|
||||
storage_index, renew_secret, cancel_secret = (
|
||||
new_storage_index(),
|
||||
new_secret(),
|
||||
new_secret(),
|
||||
)
|
||||
(_, allocated) = yield self.storage_server.allocate_buckets(
|
||||
storage_index,
|
||||
renew_secret,
|
||||
cancel_secret,
|
||||
sharenums={0},
|
||||
allocated_size=30,
|
||||
canary=Referenceable(),
|
||||
)
|
||||
|
||||
yield allocated[0].callRemote("write", 0, b"1" * 25)
|
||||
# Overlapping write that doesn't match:
|
||||
with self.assertRaises(RemoteException):
|
||||
yield allocated[0].callRemote("write", 20, b"2" * 10)
|
||||
|
||||
@inlineCallbacks
|
||||
def test_matching_overlapping_writes(self):
|
||||
"""
|
||||
When doing overlapping writes in immutable uploads, matching writes
|
||||
succeed.
|
||||
"""
|
||||
storage_index, renew_secret, cancel_secret = (
|
||||
new_storage_index(),
|
||||
new_secret(),
|
||||
new_secret(),
|
||||
)
|
||||
(_, allocated) = yield self.storage_server.allocate_buckets(
|
||||
storage_index,
|
||||
renew_secret,
|
||||
cancel_secret,
|
||||
sharenums={0},
|
||||
allocated_size=25,
|
||||
canary=Referenceable(),
|
||||
)
|
||||
|
||||
yield allocated[0].callRemote("write", 0, b"1" * 10)
|
||||
# Overlapping write that matches:
|
||||
yield allocated[0].callRemote("write", 5, b"1" * 20)
|
||||
yield allocated[0].callRemote("close")
|
||||
|
||||
buckets = yield self.storage_server.get_buckets(storage_index)
|
||||
self.assertEqual(set(buckets.keys()), {0})
|
||||
|
||||
self.assertEqual((yield buckets[0].callRemote("read", 0, 25)), b"1" * 25)
|
||||
|
||||
@inlineCallbacks
|
||||
def test_get_buckets_skips_unfinished_buckets(self):
|
||||
|
@ -8,7 +8,7 @@ from __future__ import division
|
||||
from __future__ import print_function
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from future.utils import native_str, PY2, bytes_to_native_str
|
||||
from future.utils import native_str, PY2, bytes_to_native_str, bchr
|
||||
if PY2:
|
||||
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
|
||||
from six import ensure_str
|
||||
@ -20,12 +20,15 @@ import stat
|
||||
import struct
|
||||
import shutil
|
||||
import gc
|
||||
from uuid import uuid4
|
||||
|
||||
from twisted.trial import unittest
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.task import Clock
|
||||
|
||||
from hypothesis import given, strategies
|
||||
|
||||
import itertools
|
||||
from allmydata import interfaces
|
||||
from allmydata.util import fileutil, hashutil, base32
|
||||
@ -33,7 +36,7 @@ from allmydata.storage.server import StorageServer, DEFAULT_RENEWAL_TIME
|
||||
from allmydata.storage.shares import get_share_file
|
||||
from allmydata.storage.mutable import MutableShareFile
|
||||
from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile
|
||||
from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
|
||||
from allmydata.storage.common import storage_index_to_dir, \
|
||||
UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError, \
|
||||
si_b2a, si_a2b
|
||||
from allmydata.storage.lease import LeaseInfo
|
||||
@ -47,7 +50,9 @@ from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
|
||||
SIGNATURE_SIZE, \
|
||||
VERIFICATION_KEY_SIZE, \
|
||||
SHARE_HASH_CHAIN_SIZE
|
||||
from allmydata.interfaces import BadWriteEnablerError
|
||||
from allmydata.interfaces import (
|
||||
BadWriteEnablerError, DataTooLargeError, ConflictingWriteError,
|
||||
)
|
||||
from allmydata.test.no_network import NoNetworkServer
|
||||
from allmydata.storage_client import (
|
||||
_StorageServer,
|
||||
@ -147,6 +152,91 @@ class Bucket(unittest.TestCase):
|
||||
self.failUnlessEqual(br.remote_read(25, 25), b"b"*25)
|
||||
self.failUnlessEqual(br.remote_read(50, 7), b"c"*7)
|
||||
|
||||
def test_write_past_size_errors(self):
|
||||
"""Writing beyond the size of the bucket throws an exception."""
|
||||
for (i, (offset, length)) in enumerate([(0, 201), (10, 191), (202, 34)]):
|
||||
incoming, final = self.make_workdir(
|
||||
"test_write_past_size_errors-{}".format(i)
|
||||
)
|
||||
bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
|
||||
FakeCanary())
|
||||
with self.assertRaises(DataTooLargeError):
|
||||
bw.remote_write(offset, b"a" * length)
|
||||
|
||||
@given(
|
||||
maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98),
|
||||
maybe_overlapping_length=strategies.integers(min_value=1, max_value=100),
|
||||
)
|
||||
def test_overlapping_writes_ok_if_matching(
|
||||
self, maybe_overlapping_offset, maybe_overlapping_length
|
||||
):
|
||||
"""
|
||||
Writes that overlap with previous writes are OK when the content is the
|
||||
same.
|
||||
"""
|
||||
length = 100
|
||||
expected_data = b"".join(bchr(i) for i in range(100))
|
||||
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
|
||||
bw = BucketWriter(
|
||||
self, incoming, final, length, self.make_lease(),
|
||||
FakeCanary()
|
||||
)
|
||||
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
|
||||
bw.remote_write(10, expected_data[10:20])
|
||||
bw.remote_write(30, expected_data[30:40])
|
||||
bw.remote_write(50, expected_data[50:60])
|
||||
# Then, an overlapping write but with matching data:
|
||||
bw.remote_write(
|
||||
maybe_overlapping_offset,
|
||||
expected_data[
|
||||
maybe_overlapping_offset:maybe_overlapping_offset + maybe_overlapping_length
|
||||
]
|
||||
)
|
||||
# Now fill in the holes:
|
||||
bw.remote_write(0, expected_data[0:10])
|
||||
bw.remote_write(20, expected_data[20:30])
|
||||
bw.remote_write(40, expected_data[40:50])
|
||||
bw.remote_write(60, expected_data[60:])
|
||||
bw.remote_close()
|
||||
|
||||
br = BucketReader(self, bw.finalhome)
|
||||
self.assertEqual(br.remote_read(0, length), expected_data)
|
||||
|
||||
|
||||
@given(
|
||||
maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98),
|
||||
maybe_overlapping_length=strategies.integers(min_value=1, max_value=100),
|
||||
)
|
||||
def test_overlapping_writes_not_ok_if_different(
|
||||
self, maybe_overlapping_offset, maybe_overlapping_length
|
||||
):
|
||||
"""
|
||||
Writes that overlap with previous writes fail with an exception if the
|
||||
contents don't match.
|
||||
"""
|
||||
length = 100
|
||||
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
|
||||
bw = BucketWriter(
|
||||
self, incoming, final, length, self.make_lease(),
|
||||
FakeCanary()
|
||||
)
|
||||
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
|
||||
bw.remote_write(10, b"1" * 10)
|
||||
bw.remote_write(30, b"1" * 10)
|
||||
bw.remote_write(50, b"1" * 10)
|
||||
# Then, write something that might overlap with some of them, but
|
||||
# conflicts. Then fill in holes left by first three writes. Conflict is
|
||||
# inevitable.
|
||||
with self.assertRaises(ConflictingWriteError):
|
||||
bw.remote_write(
|
||||
maybe_overlapping_offset,
|
||||
b'X' * min(maybe_overlapping_length, length - maybe_overlapping_offset),
|
||||
)
|
||||
bw.remote_write(0, b"1" * 10)
|
||||
bw.remote_write(20, b"1" * 10)
|
||||
bw.remote_write(40, b"1" * 10)
|
||||
bw.remote_write(60, b"1" * 40)
|
||||
|
||||
def test_read_past_end_of_share_data(self):
|
||||
# test vector for immutable files (hard-coded contents of an immutable share
|
||||
# file):
|
||||
|
Loading…
x
Reference in New Issue
Block a user