Split out Foolscap code from BucketReader/Writer.

This commit is contained in:
Itamar Turner-Trauring 2021-12-02 10:29:52 -05:00
parent f7cb4d5c92
commit 476c41e49e
2 changed files with 62 additions and 16 deletions

View File

@ -230,8 +230,10 @@ class ShareFile(object):
return space_freed
@implementer(RIBucketWriter)
class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
class BucketWriter(object):
"""
Keep track of the process of writing to a ShareFile.
"""
def __init__(self, ss, incominghome, finalhome, max_size, lease_info, clock):
self.ss = ss
@ -251,7 +253,7 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
def allocated_size(self):
return self._max_size
def remote_write(self, offset, data):
def write(self, offset, data):
# Delay the timeout, since we received data:
self._timeout.reset(30 * 60)
start = self._clock.seconds()
@ -275,9 +277,6 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self.ss.add_latency("write", self._clock.seconds() - start)
self.ss.count("write")
def remote_close(self):
self.close()
def close(self):
precondition(not self.closed)
self._timeout.cancel()
@ -329,13 +328,10 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
facility="tahoe.storage", level=log.UNUSUAL)
self.abort()
def remote_abort(self):
def abort(self):
log.msg("storage: aborting sharefile %s" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL)
self.abort()
self.ss.count("abort")
def abort(self):
if self.closed:
return
@ -358,8 +354,28 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self._timeout.cancel()
@implementer(RIBucketReader)
class BucketReader(Referenceable): # type: ignore # warner/foolscap#78
@implementer(RIBucketWriter)
class FoolscapBucketWriter(Referenceable): # type: ignore # warner/foolscap#78
"""
Foolscap-specific BucketWriter.
"""
def __init__(self, bucket_writer):
self._bucket_writer = bucket_writer
def remote_write(self, offset, data):
return self._bucket_writer.write(offset, data)
def remote_close(self):
return self._bucket_writer.close()
def remote_abort(self):
return self._bucket_writer.abort()
class BucketReader(object):
"""
Manage the process for reading from a ``ShareFile``.
"""
def __init__(self, ss, sharefname, storage_index=None, shnum=None):
self.ss = ss
@ -374,15 +390,31 @@ class BucketReader(Referenceable): # type: ignore # warner/foolscap#78
),
self.shnum)
def remote_read(self, offset, length):
def read(self, offset, length):
start = time.time()
data = self._share_file.read_share_data(offset, length)
self.ss.add_latency("read", time.time() - start)
self.ss.count("read")
return data
def remote_advise_corrupt_share(self, reason):
def advise_corrupt_share(self, reason):
return self.ss.advise_corrupt_share(b"immutable",
self.storage_index,
self.shnum,
reason)
@implementer(RIBucketReader)
class FoolscapBucketReader(Referenceable): # type: ignore # warner/foolscap#78
"""
Foolscap wrapper for ``BucketReader``
"""
def __init__(self, bucket_reader):
self._bucket_reader = bucket_reader
def remote_read(self, offset, length):
return self._bucket_reader.read(offset, length)
def remote_advise_corrupt_share(self, reason):
return self._bucket_reader.advise_corrupt_share(reason)

View File

@ -33,7 +33,10 @@ from allmydata.storage.lease import LeaseInfo
from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
create_mutable_sharefile
from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
from allmydata.storage.immutable import (
ShareFile, BucketWriter, BucketReader, FoolscapBucketWriter,
FoolscapBucketReader,
)
from allmydata.storage.crawler import BucketCountingCrawler
from allmydata.storage.expirer import LeaseCheckingCrawler
@ -782,10 +785,18 @@ class FoolscapStorageServer(Referenceable): # type: ignore # warner/foolscap#78
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
owner_num=owner_num, renew_leases=True,
)
# Abort BucketWriters if disconnection happens.
for bw in bucketwriters.values():
disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker)
# Wrap BucketWriters with Foolscap adapter:
bucketwriters = {
k: FoolscapBucketWriter(bw)
for (k, bw) in bucketwriters.items()
}
return alreadygot, bucketwriters
def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
@ -796,7 +807,10 @@ class FoolscapStorageServer(Referenceable): # type: ignore # warner/foolscap#78
return self._server.renew_lease(storage_index, renew_secret)
def remote_get_buckets(self, storage_index):
return self._server.get_buckets(storage_index)
return {
k: FoolscapBucketReader(bucket)
for (k, bucket) in self._server.get_buckets(storage_index).items()
}
def remote_slot_testv_and_readv_and_writev(self, storage_index,
secrets,