Add tests for announcements for plugins

And a basic implementation
This commit is contained in:
Jean-Paul Calderone 2019-06-14 16:34:10 -04:00
parent 9608404b6e
commit 646cd452b9
5 changed files with 310 additions and 4 deletions

View File

@ -3,7 +3,11 @@ from base64 import urlsafe_b64encode
from functools import partial
from errno import ENOENT, EPERM
import attr
from zope.interface import implementer
from twisted.plugin import (
getPlugins,
)
from twisted.internet import reactor, defer
from twisted.application import service
from twisted.application.internet import TimerService
@ -30,7 +34,14 @@ from allmydata.util.i2p_provider import create as create_i2p_provider
from allmydata.util.tor_provider import create as create_tor_provider
from allmydata.stats import StatsProvider
from allmydata.history import History
from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION, DEFAULT_MAX_SEGMENT_SIZE
from allmydata.interfaces import (
IStatsProducer,
SDMF_VERSION,
MDMF_VERSION,
DEFAULT_MAX_SEGMENT_SIZE,
IFoolscapStoragePlugin,
IAnnounceableStorageServer,
)
from allmydata.nodemaker import NodeMaker
from allmydata.blacklist import Blacklist
from allmydata import node
@ -394,6 +405,15 @@ def create_storage_farm_broker(config, default_connection_handlers, foolscap_con
return sb
@implementer(IAnnounceableStorageServer)
@attr.s
class AnnounceableStorageServer(object):
announcement = attr.ib()
storage_server = attr.ib()
@implementer(IStatsProducer)
class _Client(node.Node, pollmixin.PollMixin):
@ -597,6 +617,117 @@ class _Client(node.Node, pollmixin.PollMixin):
for ic in self.introducer_clients:
ic.publish("storage", ann, self._node_private_key)
self._init_storage_plugins()
def _init_storage_plugins(self):
"""
Load, register, and announce any configured storage plugins.
"""
storage_plugin_names = self._get_enabled_storage_plugin_names()
plugins = list(self._collect_storage_plugins(storage_plugin_names))
# TODO What if some names aren't found?
announceable_storage_servers = self._create_plugin_storage_servers(plugins)
self._enable_storage_servers(announceable_storage_servers)
def _get_enabled_storage_plugin_names(self):
"""
Get the names of storage plugins that are enabled in the configuration.
"""
return {
self.config.get_config(
"storage", "plugins", b""
).decode("ascii")
}
def _collect_storage_plugins(self, storage_plugin_names):
"""
Get the storage plugins with names matching those given.
"""
return list(
plugin
for plugin
in getPlugins(IFoolscapStoragePlugin)
if plugin.name in storage_plugin_names
)
def _create_plugin_storage_servers(self, plugins):
"""
Cause each storage plugin to instantiate its storage server and return
them all.
"""
return list(
self._add_to_announcement(
{u"name": plugin.name},
plugin.get_storage_server(
self._get_storage_plugin_configuration(plugin.name),
lambda: self.getServiceNamed(StorageServer.name)
),
)
for plugin
in plugins
)
def _add_to_announcement(self, information, announceable_storage_server):
"""
Create a new ``AnnounceableStorageServer`` based on
``announceable_storage_server`` with ``information`` added to its
``announcement``.
"""
updated_announcement = announceable_storage_server.announcement.copy()
updated_announcement.update(information)
return AnnounceableStorageServer(
updated_announcement,
announceable_storage_server.storage_server,
)
def _get_storage_plugin_configuration(self, storage_plugin_name):
return dict(
# Need to reach past the Tahoe-LAFS-supplied wrapper around the
# underlying ConfigParser...
self.config.config.items("storageserver.plugins." + storage_plugin_name)
)
def _enable_storage_servers(self, announceable_storage_servers):
"""
Register and announce the given storage servers.
"""
for announceable in announceable_storage_servers:
self._enable_storage_server(announceable)
def _enable_storage_server(self, announceable_storage_server):
"""
Register and announce a storage server.
"""
furl_file = self.config.get_private_path(
"storage-plugin.{}.furl".format(
# Oops, why don't I have a better handle on this value?
announceable_storage_server.announcement[u"name"],
),
)
furl = self.tub.registerReference(
announceable_storage_server.storage_server,
furlFile=furl_file.encode(get_filesystem_encoding()),
)
announceable_storage_server = self._add_to_announcement(
{u"storage-server-FURL": furl},
announceable_storage_server,
)
for ic in self.introducer_clients:
ic.publish(
"storage",
announceable_storage_server.announcement,
self._node_key,
)
def init_client(self):
helper_furl = self.config.get_config("client", "helper.furl", None)
if helper_furl in ("None", ""):

View File

@ -77,3 +77,14 @@ def matches_base32():
Match any base32 encoded byte string.
"""
return AfterPreprocessing(base32.a2b, Always())
class MatchesSameElements(object):
"""
Match if the two-tuple value given contains two elements that are equal to
each other.
"""
def match(self, value):
left, right = value
return Equals(left).match(right)

View File

@ -2,4 +2,10 @@ from allmydata.test.common import (
AdoptedServerPort,
)
from allmydata.test.storage_plugin import (
DummyStorage,
)
adoptedEndpointParser = AdoptedServerPort()
dummyStorage = DummyStorage()

View File

@ -0,0 +1,58 @@
"""
A storage server plugin the test suite can use to validate the
functionality.
"""
import attr
from zope.interface import (
implementer,
)
from foolscap.api import (
RemoteInterface,
)
from allmydata.interfaces import (
IFoolscapStoragePlugin,
)
from allmydata.client import (
AnnounceableStorageServer,
)
class RIDummy(RemoteInterface):
__remote_name__ = "RIDummy.tahoe.allmydata.com"
def just_some_method():
"""
Just some method so there is something callable on this object. We won't
pretend to actually offer any storage capabilities.
"""
@implementer(IFoolscapStoragePlugin)
class DummyStorage(object):
name = u"tahoe-lafs-dummy-v1"
def get_storage_server(self, configuration, get_anonymous_storage_server):
return AnnounceableStorageServer(
announcement={u"value": configuration[u"some"]},
storage_server=DummyStorageServer(get_anonymous_storage_server),
)
def get_storage_client(self, configuration, announcement):
pass
@implementer(RIDummy)
@attr.s(cmp=True, hash=True)
class DummyStorageServer(object):
# TODO Requirement of some interface that instances be hashable
get_anonymous_storage_server = attr.ib(cmp=False)
def remote_just_some_method(self):
pass

View File

@ -25,6 +25,8 @@ from testtools.matchers import (
Equals,
AfterPreprocessing,
MatchesListwise,
MatchesDict,
MatchesStructure,
)
from testtools.twistedsupport import (
succeeded,
@ -39,7 +41,11 @@ from allmydata.node import config_from_string
from allmydata.frontends.auth import NeedRootcapLookupScheme
from allmydata import client
from allmydata.storage_client import StorageFarmBroker
from allmydata.util import base32, fileutil, encodingutil
from allmydata.util import (
base32,
fileutil,
encodingutil,
)
from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.interfaces import IFilesystemNode, IFileNode, \
IImmutableFileNode, IMutableFileNode, IDirectoryNode
@ -47,9 +53,13 @@ from foolscap.api import flushEventualQueue
import allmydata.test.common_util as testutil
from .common import (
SyncTestCase,
UseTestPlugins,
)
from .matchers import (
MatchesNodePublicKey,
MatchesSameElements,
matches_anonymous_storage_announcement,
matches_furl,
)
BASECONFIG = ("[client]\n"
@ -1068,17 +1078,24 @@ class StorageAnnouncementTests(SyncTestCase):
create_node_dir(self.basedir, u"")
def get_config(self, storage_enabled):
def get_config(self, storage_enabled, more_storage=b"", more_sections=b""):
return b"""
[node]
tub.location = tcp:192.0.2.0:1234
[storage]
enabled = {storage_enabled}
{more_storage}
[client]
introducer.furl = pb://abcde@nowhere/fake
""".format(storage_enabled=storage_enabled)
{more_sections}
""".format(
storage_enabled=storage_enabled,
more_storage=more_storage,
more_sections=more_sections,
)
def test_no_announcement(self):
@ -1126,3 +1143,86 @@ introducer.furl = pb://abcde@nowhere/fake
]),
)),
)
def test_single_storage_plugin_announcement(self):
"""
The announcement from a single enabled storage plugin is published when
storage is enabled.
"""
self.useFixture(UseTestPlugins())
config = config_from_string(
self.basedir,
u"tub.port",
self.get_config(
storage_enabled=True,
more_storage=b"plugins=tahoe-lafs-dummy-v1",
more_sections=(
b"[storageserver.plugins.tahoe-lafs-dummy-v1]\n"
b"some = thing\n"
),
),
)
matches_dummy_announcement = MatchesStructure(
service_name=Equals("storage"),
ann=MatchesDict({
# Everyone gets a name and a fURL added to their announcement.
u"name": Equals(u"tahoe-lafs-dummy-v1"),
u"storage-server-FURL": matches_furl(),
# The plugin can contribute things, too.
u"value": Equals(u"thing"),
}),
signing_key=MatchesNodePublicKey(self.basedir),
)
self.assertThat(
client.create_client_from_config(config, introducer_factory=MemoryIntroducerClient),
succeeded(AfterPreprocessing(
get_published_announcements,
MatchesListwise([
matches_anonymous_storage_announcement(self.basedir),
matches_dummy_announcement,
]),
)),
)
def test_stable_storage_server_furl(self):
"""
The value for the ``storage-server-FURL`` item in the announcement for a
particular storage server plugin is stable across different node
instantiations.
"""
self.useFixture(UseTestPlugins())
config = config_from_string(
self.basedir,
u"tub.port",
self.get_config(
storage_enabled=True,
more_storage=b"plugins=tahoe-lafs-dummy-v1",
more_sections=(
b"[storageserver.plugins.tahoe-lafs-dummy-v1]\n"
b"some = thing\n"
),
),
)
node_a = client.create_client_from_config(
config,
introducer_factory=MemoryIntroducerClient,
)
node_b = client.create_client_from_config(
config,
introducer_factory=MemoryIntroducerClient,
)
self.assertThat(
defer.gatherResults([node_a, node_b]),
succeeded(AfterPreprocessing(
lambda (a, b): (
get_published_announcements(a),
get_published_announcements(b),
),
MatchesSameElements(),
)),
)