mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-06-19 07:48:11 +00:00
Hook up the new FoolscapStorageServer, and fix enough bugs, such that almost all
end-to-end and integration tests pass.
This commit is contained in:
@ -36,7 +36,7 @@ from twisted.python.filepath import FilePath
|
|||||||
import allmydata
|
import allmydata
|
||||||
from allmydata.crypto import rsa, ed25519
|
from allmydata.crypto import rsa, ed25519
|
||||||
from allmydata.crypto.util import remove_prefix
|
from allmydata.crypto.util import remove_prefix
|
||||||
from allmydata.storage.server import StorageServer
|
from allmydata.storage.server import StorageServer, FoolscapStorageServer
|
||||||
from allmydata import storage_client
|
from allmydata import storage_client
|
||||||
from allmydata.immutable.upload import Uploader
|
from allmydata.immutable.upload import Uploader
|
||||||
from allmydata.immutable.offloaded import Helper
|
from allmydata.immutable.offloaded import Helper
|
||||||
@ -834,7 +834,7 @@ class _Client(node.Node, pollmixin.PollMixin):
|
|||||||
|
|
||||||
if anonymous_storage_enabled(self.config):
|
if anonymous_storage_enabled(self.config):
|
||||||
furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding())
|
furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding())
|
||||||
furl = self.tub.registerReference(ss, furlFile=furl_file)
|
furl = self.tub.registerReference(FoolscapStorageServer(ss), furlFile=furl_file)
|
||||||
announcement["anonymous-storage-FURL"] = furl
|
announcement["anonymous-storage-FURL"] = furl
|
||||||
|
|
||||||
enabled_storage_servers = self._enable_storage_servers(
|
enabled_storage_servers = self._enable_storage_servers(
|
||||||
|
@ -382,7 +382,7 @@ class BucketReader(Referenceable): # type: ignore # warner/foolscap#78
|
|||||||
return data
|
return data
|
||||||
|
|
||||||
def remote_advise_corrupt_share(self, reason):
|
def remote_advise_corrupt_share(self, reason):
|
||||||
return self.ss.remote_advise_corrupt_share(b"immutable",
|
return self.ss.advise_corrupt_share(b"immutable",
|
||||||
self.storage_index,
|
self.storage_index,
|
||||||
self.shnum,
|
self.shnum,
|
||||||
reason)
|
reason)
|
||||||
|
@ -127,6 +127,9 @@ class StorageServer(service.MultiService):
|
|||||||
# Map in-progress filesystem path -> BucketWriter:
|
# Map in-progress filesystem path -> BucketWriter:
|
||||||
self._bucket_writers = {} # type: Dict[str,BucketWriter]
|
self._bucket_writers = {} # type: Dict[str,BucketWriter]
|
||||||
|
|
||||||
|
# These callables will be called with BucketWriters that closed:
|
||||||
|
self._call_on_bucket_writer_close = []
|
||||||
|
|
||||||
def stopService(self):
|
def stopService(self):
|
||||||
# Cancel any in-progress uploads:
|
# Cancel any in-progress uploads:
|
||||||
for bw in list(self._bucket_writers.values()):
|
for bw in list(self._bucket_writers.values()):
|
||||||
@ -405,9 +408,14 @@ class StorageServer(service.MultiService):
|
|||||||
if self.stats_provider:
|
if self.stats_provider:
|
||||||
self.stats_provider.count('storage_server.bytes_added', consumed_size)
|
self.stats_provider.count('storage_server.bytes_added', consumed_size)
|
||||||
del self._bucket_writers[bw.incominghome]
|
del self._bucket_writers[bw.incominghome]
|
||||||
if bw in self._bucket_writer_disconnect_markers:
|
for handler in self._call_on_bucket_writer_close:
|
||||||
canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw)
|
handler(bw)
|
||||||
canary.dontNotifyOnDisconnect(disconnect_marker)
|
|
||||||
|
def register_bucket_writer_close_handler(self, handler):
|
||||||
|
"""
|
||||||
|
The handler will be called with any ``BucketWriter`` that closes.
|
||||||
|
"""
|
||||||
|
self._call_on_bucket_writer_close.append(handler)
|
||||||
|
|
||||||
def _get_bucket_shares(self, storage_index):
|
def _get_bucket_shares(self, storage_index):
|
||||||
"""Return a list of (shnum, pathname) tuples for files that hold
|
"""Return a list of (shnum, pathname) tuples for files that hold
|
||||||
@ -755,9 +763,15 @@ class FoolscapStorageServer(Referenceable): # type: ignore # warner/foolscap#78
|
|||||||
# Canaries and disconnect markers for BucketWriters created via Foolscap:
|
# Canaries and disconnect markers for BucketWriters created via Foolscap:
|
||||||
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,Tuple[IRemoteReference, object]]
|
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,Tuple[IRemoteReference, object]]
|
||||||
|
|
||||||
|
self._server.register_bucket_writer_close_handler(self._bucket_writer_closed)
|
||||||
|
|
||||||
|
def _bucket_writer_closed(self, bw):
|
||||||
|
if bw in self._bucket_writer_disconnect_markers:
|
||||||
|
canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw)
|
||||||
|
canary.dontNotifyOnDisconnect(disconnect_marker)
|
||||||
|
|
||||||
def remote_get_version(self):
|
def remote_get_version(self):
|
||||||
return self.get_version()
|
return self._server.get_version()
|
||||||
|
|
||||||
def remote_allocate_buckets(self, storage_index,
|
def remote_allocate_buckets(self, storage_index,
|
||||||
renew_secret, cancel_secret,
|
renew_secret, cancel_secret,
|
||||||
@ -797,7 +811,7 @@ class FoolscapStorageServer(Referenceable): # type: ignore # warner/foolscap#78
|
|||||||
)
|
)
|
||||||
|
|
||||||
def remote_slot_readv(self, storage_index, shares, readv):
|
def remote_slot_readv(self, storage_index, shares, readv):
|
||||||
return self._server.slot_readv(self, storage_index, shares, readv)
|
return self._server.slot_readv(storage_index, shares, readv)
|
||||||
|
|
||||||
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
|
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
|
||||||
reason):
|
reason):
|
||||||
|
Reference in New Issue
Block a user