refactor more code into grid.py

This commit is contained in:
meejah 2020-05-12 12:22:35 -06:00
parent a9fe12063a
commit c5fb2b5a8b
4 changed files with 223 additions and 72 deletions

View File

@ -36,7 +36,11 @@ from util import (
await_client_ready,
TahoeProcess,
)
import grid
from grid import (
create_port_allocator,
create_flog_gatherer,
create_grid,
)
# pytest customization hooks
@ -73,6 +77,12 @@ def reactor():
return _reactor
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:port_allocator", include_result=False)
def port_allocator(reactor):
return create_port_allocator(start_port=45000)
@pytest.fixture(scope='session')
@log_call(action_type=u"integration:temp_dir", include_args=[])
def temp_dir(request):
@ -108,20 +118,23 @@ def flog_binary():
@log_call(action_type=u"integration:flog_gatherer", include_args=[])
def flog_gatherer(reactor, temp_dir, flog_binary, request):
fg = pytest_twisted.blockon(
grid.create_flog_gatherer(reactor, request, temp_dir, flog_binary)
create_flog_gatherer(reactor, request, temp_dir, flog_binary)
)
return fg
@pytest.fixture(scope='session')
@log_call(
action_type=u"integration:introducer",
include_args=["temp_dir", "flog_gatherer"],
include_result=False,
)
def introducer(reactor, temp_dir, flog_gatherer, request):
intro = pytest_twisted.blockon(grid.create_introducer(reactor, request, temp_dir, flog_gatherer))
return intro
@log_call(action_type=u"integration:grid", include_args=[])
def grid(reactor, request, temp_dir, flog_gatherer, port_allocator):
g = pytest_twisted.blockon(
create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator)
)
return g
@pytest.fixture(scope='session')
def introducer(grid):
return grid.introducer
@pytest.fixture(scope='session')
@ -206,26 +219,20 @@ def tor_introducer_furl(tor_introducer, temp_dir):
@pytest.fixture(scope='session')
@log_call(
action_type=u"integration:storage_nodes",
include_args=["temp_dir", "introducer_furl", "flog_gatherer"],
include_args=["grid"],
include_result=False,
)
def storage_nodes(reactor, temp_dir, introducer, introducer_furl, flog_gatherer, request):
def storage_nodes(grid):
nodes_d = []
# start all 5 nodes in parallel
for x in range(5):
name = 'node{}'.format(x)
web_port = 'tcp:{}:interface=localhost'.format(9990 + x)
nodes_d.append(
grid.create_storage_server(
reactor, request, temp_dir, introducer, flog_gatherer, name, web_port,
)
)
#nodes_d.append(grid.add_storage_node())
pytest_twisted.blockon(grid.add_storage_node())
nodes_status = pytest_twisted.blockon(DeferredList(nodes_d))
nodes = []
for ok, process in nodes_status:
assert ok, "Storage node creation failed: {}".format(process)
nodes.append(process)
return nodes
for ok, value in nodes_status:
assert ok, "Storage node creation failed: {}".format(value)
return grid.storage_servers
@pytest.fixture(scope='session')

View File

@ -12,10 +12,15 @@ from os import mkdir, listdir, environ
from os.path import join, exists
from tempfile import mkdtemp, mktemp
from eliot import (
log_call,
)
from twisted.python.procutils import which
from twisted.internet.defer import (
inlineCallbacks,
returnValue,
maybeDeferred,
)
from twisted.internet.task import (
deferLater,
@ -25,6 +30,13 @@ from twisted.internet.interfaces import (
IProcessProtocol,
IProtocol,
)
from twisted.internet.endpoints import (
TCP4ServerEndpoint,
)
from twisted.internet.protocol import (
Factory,
Protocol,
)
from util import (
_CollectOutputProtocol,
@ -35,7 +47,6 @@ from util import (
_run_node,
_cleanup_tahoe_process,
_tahoe_runner_optional_coverage,
await_client_ready,
TahoeProcess,
)
@ -165,6 +176,43 @@ def create_storage_server(reactor, request, temp_dir, introducer, flog_gatherer,
)
returnValue(storage)
@attr.s
class Client(object):
"""
Represents a Tahoe client
"""
process = attr.ib(
validator=attr.validators.instance_of(TahoeProcess)
)
protocol = attr.ib(
validator=attr.validators.provides(IProcessProtocol)
)
# XXX add stop / start / restart
# ...maybe "reconfig" of some kind?
@inlineCallbacks
def create_client(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port,
needed=2, happy=3, total=4):
"""
Create a new storage server
"""
from util import _create_node
node_process = yield _create_node(
reactor, request, temp_dir, introducer.furl, flog_gatherer,
name, web_port, storage=False, needed=needed, happy=happy, total=total,
)
returnValue(
Client(
process=node_process,
protocol=node_process.transport._protocol,
)
)
@attr.s
class Introducer(object):
"""
@ -180,29 +228,27 @@ class Introducer(object):
furl = attr.ib()
_introducer_num = 0
@inlineCallbacks
def create_introducer(reactor, request, temp_dir, flog_gatherer):
@log_call(
action_type=u"integration:introducer",
include_args=["temp_dir", "flog_gatherer"],
include_result=False,
)
def create_introducer(reactor, request, temp_dir, flog_gatherer, port):
"""
Run a new Introducer and return an Introducer instance.
"""
global _introducer_num
config = (
'[node]\n'
'nickname = introducer{num}\n'
'nickname = introducer{port}\n'
'web.port = {port}\n'
'log_gatherer.furl = {log_furl}\n'
).format(
num=_introducer_num,
port=port,
log_furl=flog_gatherer.furl,
port=4560 + _introducer_num,
)
_introducer_num += 1
intro_dir = join(temp_dir, 'introducer{}'.format(_introducer_num))
print("making introducer", intro_dir, _introducer_num)
intro_dir = join(temp_dir, 'introducer{}'.format(port))
if not exists(intro_dir):
mkdir(intro_dir)
@ -268,9 +314,14 @@ class Grid(object):
Storage Servers.
"""
introducer = attr.ib(default=None)
flog_gatherer = attr.ib(default=None)
_reactor = attr.ib()
_request = attr.ib()
_temp_dir = attr.ib()
_port_allocator = attr.ib()
introducer = attr.ib()
flog_gatherer = attr.ib()
storage_servers = attr.ib(factory=list)
clients = attr.ib(factory=dict)
@storage_servers.validator
def check(self, attribute, value):
@ -279,3 +330,115 @@ class Grid(object):
raise ValueError(
"storage_servers must be StorageServer"
)
@inlineCallbacks
def add_storage_node(self):
"""
Creates a new storage node, returns a StorageServer instance
(which will already be added to our .storage_servers list)
"""
port = yield self._port_allocator()
print("make {}".format(port))
name = 'node{}'.format(port)
web_port = 'tcp:{}:interface=localhost'.format(port)
server = yield create_storage_server(
self._reactor,
self._request,
self._temp_dir,
self.introducer,
self.flog_gatherer,
name,
web_port,
)
self.storage_servers.append(server)
returnValue(server)
@inlineCallbacks
def add_client(self, name, needed=2, happy=3, total=4):
"""
Create a new client node
"""
port = yield self._port_allocator()
web_port = 'tcp:{}:interface=localhost'.format(port)
client = yield create_client(
self._reactor,
self._request,
self._temp_dir,
self.introducer,
self.flog_gatherer,
name,
web_port,
needed=needed,
happy=happy,
total=total,
)
self.clients[name] = client
returnValue(client)
# XXX THINK can we tie a whole *grid* to a single request? (I think
# that's all that makes sense)
@inlineCallbacks
def create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator):
"""
"""
intro_port = yield port_allocator()
introducer = yield create_introducer(reactor, request, temp_dir, flog_gatherer, intro_port)
grid = Grid(
reactor,
request,
temp_dir,
port_allocator,
introducer,
flog_gatherer,
)
returnValue(grid)
def create_port_allocator(start_port):
"""
Returns a new port-allocator .. which is a zero-argument function
that returns Deferreds that fire with new, sequential ports
starting at `start_port` skipping any that already appear to have
a listener.
There can still be a race against other processes allocating ports
-- between the time when we check the status of the port and when
our subprocess starts up. This *could* be mitigated by instructing
the OS to not randomly-allocate ports in some range, and then
using that range here (explicitly, ourselves).
NB once we're Python3-only this could be an async-generator
"""
port = [start_port - 1]
# import stays here to not interfere with reactor selection -- but
# maybe this function should be arranged to be called once from a
# fixture (with the reactor)?
from twisted.internet import reactor
class NothingProtocol(Protocol):
"""
I do nothing.
"""
def port_generator():
print("Checking port {}".format(port))
port[0] += 1
ep = TCP4ServerEndpoint(reactor, port[0], interface="localhost")
d = ep.listen(Factory.forProtocol(NothingProtocol))
def good(listening_port):
unlisten_d = maybeDeferred(listening_port.stopListening)
def return_port(_):
return port[0]
unlisten_d.addBoth(return_port)
return unlisten_d
def try_again(fail):
return port_generator()
d.addCallbacks(good, try_again)
return d
return port_generator

View File

@ -104,30 +104,16 @@ def test_remove_last_client(reactor, request):
@pytest_twisted.inlineCallbacks
def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer):
def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer, port_allocator):
"""
A client with happines=2 fails to upload to a Grid when it is
using Grid Manager and there is only 1 storage server with a valid
certificate.
"""
import grid
introducer = yield grid.create_introducer(reactor, request, temp_dir, flog_gatherer)
storage0 = yield grid.create_storage_server(
reactor, request, temp_dir, introducer, flog_gatherer,
name="gm_storage0",
web_port="tcp:9995:interface=localhost",
needed=2,
happy=2,
total=2,
)
storage1 = yield grid.create_storage_server(
reactor, request, temp_dir, introducer, flog_gatherer,
name="gm_storage1",
web_port="tcp:9996:interface=localhost",
needed=2,
happy=2,
total=2,
)
from grid import create_grid
grid = yield create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator)
storage0 = yield grid.add_storage_node()
storage1 = yield grid.add_storage_node()
gm_config = yield util.run_tahoe(
reactor, request, "grid-manager", "--config", "-", "create",
@ -147,12 +133,7 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer):
)
assert json.loads(gm_config)['storage_servers'].keys() == ['storage0']
# XXX FIXME want a grid.create_client() or similar
diana = yield util._create_node(
reactor, request, temp_dir, introducer.furl, flog_gatherer, "diana",
web_port="tcp:9984:interface=localhost",
storage=False,
)
diana = yield grid.add_client("diana", needed=2, happy=2, total=2)
print("inserting certificate")
cert = yield util.run_tahoe(
@ -175,23 +156,23 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer):
reactor, storage0.process.node_dir, request, None,
)
yield util.await_client_ready(diana, servers=2)
yield util.await_client_ready(diana.process, servers=2)
# now only one storage-server has the certificate .. configure
# diana to have the grid-manager certificate
config = configutil.get_config(join(diana.node_dir, "tahoe.cfg"))
config = configutil.get_config(join(diana.process.node_dir, "tahoe.cfg"))
config.add_section("grid_managers")
config.set("grid_managers", "test", ed25519.string_from_verifying_key(gm_pubkey))
with open(join(diana.node_dir, "tahoe.cfg"), "w") as f:
with open(join(diana.process.node_dir, "tahoe.cfg"), "w") as f:
config.write(f)
diana.transport.signalProcess('TERM')
yield diana.transport._protocol.exited
diana.process.transport.signalProcess('TERM')
yield diana.protocol.exited
diana = yield util._run_node(
reactor, diana._node_dir, request, None,
reactor, diana.process.node_dir, request, None,
)
yield util.await_client_ready(diana, servers=2)
yield util.await_client_ready(diana.process, servers=2)
# try to put something into the grid, which should fail (because
# diana has happy=2 but should only find storage0 to be acceptable
@ -199,7 +180,7 @@ def test_reject_storage_server(reactor, request, temp_dir, flog_gatherer):
try:
yield util.run_tahoe(
reactor, request, "--node-directory", diana._node_dir,
reactor, request, "--node-directory", diana.process.node_dir,
"put", "-",
stdin="some content\n" * 200,
)

View File

@ -266,7 +266,7 @@ def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, nam
if exists(node_dir):
created_d = succeed(None)
else:
print("creating", node_dir)
print("creating: {}".format(node_dir))
mkdir(node_dir)
done_proto = _ProcessExitedProtocol()
args = [