Prevent conflicting overlapping writes.

This commit is contained in:
Itamar Turner-Trauring 2021-09-24 11:54:08 -04:00
parent cae989e8de
commit 6ef3811112
3 changed files with 25 additions and 4 deletions

View File

@ -97,7 +97,7 @@ EOF
setuptoolsTrial pyasn1 zope_interface
service-identity pyyaml magic-wormhole treq
eliot autobahn cryptography netifaces setuptools
future pyutil distro configparser
future pyutil distro configparser collections-extended
];
checkInputs = with python.pkgs; [

View File

@ -137,6 +137,9 @@ install_requires = [
# Backported configparser for Python 2:
"configparser ; python_version < '3.0'",
# For the RangeMap datastructure.
"collections-extended",
]
setup_requires = [

View File

@ -13,16 +13,20 @@ if PY2:
import os, stat, struct, time
from collections_extended import RangeMap
from foolscap.api import Referenceable
from zope.interface import implementer
from allmydata.interfaces import RIBucketWriter, RIBucketReader
from allmydata.interfaces import (
RIBucketWriter, RIBucketReader, ConflictingWriteError,
DataTooLargeError,
)
from allmydata.util import base32, fileutil, log
from allmydata.util.assertutil import precondition
from allmydata.util.hashutil import timing_safe_compare
from allmydata.storage.lease import LeaseInfo
from allmydata.storage.common import UnknownImmutableContainerVersionError, \
DataTooLargeError
from allmydata.storage.common import UnknownImmutableContainerVersionError
# each share file (in storage/shares/$SI/$SHNUM) contains lease information
# and share data. The share data is accessed by RIBucketWriter.write and
@ -217,6 +221,7 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
# also, add our lease to the file now, so that other ones can be
# added by simultaneous uploaders
self._sharefile.add_lease(lease_info)
self._already_written = RangeMap()
def allocated_size(self):
return self._max_size
@ -226,7 +231,20 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
precondition(not self.closed)
if self.throw_out_all_data:
return
# Make sure we're not conflicting with existing data:
end = offset + len(data)
for (chunk_start, chunk_stop, _) in self._already_written.ranges(offset, end):
chunk_len = chunk_stop - chunk_start
actual_chunk = self._sharefile.read_share_data(chunk_start, chunk_len)
writing_chunk = data[chunk_start - offset:chunk_stop - offset]
if actual_chunk != writing_chunk:
raise ConflictingWriteError(
"Chunk {}-{} doesn't match already written data.".format(chunk_start, chunk_stop)
)
self._sharefile.write_share_data(offset, data)
self._already_written.set(True, offset, end)
self.ss.add_latency("write", time.time() - start)
self.ss.count("write")