diff --git a/integration/test_streaming_logs.py b/integration/test_streaming_logs.py new file mode 100644 index 000000000..32b97644d --- /dev/null +++ b/integration/test_streaming_logs.py @@ -0,0 +1,145 @@ +from __future__ import ( + print_function, + unicode_literals, + absolute_import, + division, +) + +import json + +from os.path import ( + join, +) +from urlparse import ( + urlsplit, +) + +import attr + +from twisted.internet.defer import ( + Deferred, +) +from twisted.internet.endpoints import ( + HostnameEndpoint, +) + +import treq + +from autobahn.twisted.websocket import ( + WebSocketClientFactory, + WebSocketClientProtocol, +) + +from allmydata.client import ( + read_config, +) +from allmydata.web.private import ( + SCHEME, +) +from allmydata.util.eliotutil import ( + inline_callbacks, +) + +import pytest_twisted + +def _url_to_endpoint(reactor, url): + netloc = urlsplit(url).netloc + host, port = netloc.split(":") + return HostnameEndpoint(reactor, host, int(port)) + + +class _StreamingLogClientProtocol(WebSocketClientProtocol): + def onOpen(self): + self.factory.on_open.callback(self) + + def onMessage(self, payload, isBinary): + self.on_message.callback(payload) + + def onClose(self, wasClean, code, reason): + self.on_close.callback(reason) + + +def _connect_client(reactor, api_auth_token, ws_url): + factory = WebSocketClientFactory( + url=ws_url, + headers={ + "Authorization": "{} {}".format(SCHEME, api_auth_token), + } + ) + factory.protocol = _StreamingLogClientProtocol + factory.on_open = Deferred() + + endpoint = _url_to_endpoint(reactor, ws_url) + return endpoint.connect(factory) + + +def _race(left, right): + """ + Wait for the first result from either of two Deferreds. + + Any result, success or failure, causes the return Deferred to fire. It + fires with either a Left or a Right instance depending on whether the left + or right argument fired first. + + The Deferred that loses the race is cancelled and any result it eventually + produces is discarded. + """ + racing = [True] + def got_result(result, which): + if racing: + racing.pop() + loser = which.pick(left, right) + loser.cancel() + finished.callback(which(result)) + + finished = Deferred() + left.addBoth(got_result, Left) + right.addBoth(got_result, Right) + return finished + + +@attr.s +class Left(object): + value = attr.ib() + + @classmethod + def pick(cls, left, right): + return left + + +@attr.s +class Right(object): + value = attr.ib() + + @classmethod + def pick(cls, left, right): + return right + + +@inline_callbacks +def _test_streaming_logs(reactor, temp_dir, alice): + cfg = read_config(join(temp_dir, "alice"), "portnum") + node_url = cfg.get_config_from_file("node.url") + api_auth_token = cfg.get_private_config("api_auth_token") + + ws_url = node_url.replace("http://", "ws://") + log_url = ws_url + "private/logs/v1" + + print("Connecting to {}".format(log_url)) + client = yield _connect_client(reactor, api_auth_token, log_url) + print("Connected.") + client.on_close = Deferred() + client.on_message = Deferred() + + # Provoke _some_ log event. + yield treq.get(node_url) + + result = yield _race(client.on_close, client.on_message) + + assert isinstance(result, Right) + json.loads(result.value) + + +@pytest_twisted.inlineCallbacks +def test_streaming_logs(reactor, temp_dir, alice): + yield _test_streaming_logs(reactor, temp_dir, alice) diff --git a/newsfragments/3006.feature b/newsfragments/3006.feature new file mode 100644 index 000000000..d9ce19b54 --- /dev/null +++ b/newsfragments/3006.feature @@ -0,0 +1 @@ +The web API now publishes streaming Eliot logs via a token-protected WebSocket at /private/logs/v1. \ No newline at end of file diff --git a/src/allmydata/test/web/matchers.py b/src/allmydata/test/web/matchers.py new file mode 100644 index 000000000..99c91ef5c --- /dev/null +++ b/src/allmydata/test/web/matchers.py @@ -0,0 +1,28 @@ +import attr + +from testtools.matchers import Mismatch + +@attr.s +class _HasResponseCode(object): + match_expected_code = attr.ib() + + def match(self, response): + actual_code = response.code + mismatch = self.match_expected_code.match(actual_code) + if mismatch is None: + return None + return Mismatch( + u"Response {} code: {}".format( + response, + mismatch.describe(), + ), + mismatch.get_details(), + ) + +def has_response_code(match_expected_code): + """ + Match a Treq response with the given code. + + :param int expected_code: The HTTP response code expected of the response. + """ + return _HasResponseCode(match_expected_code) diff --git a/src/allmydata/test/web/test_logs.py b/src/allmydata/test/web/test_logs.py new file mode 100644 index 000000000..4895ed6f0 --- /dev/null +++ b/src/allmydata/test/web/test_logs.py @@ -0,0 +1,59 @@ +""" +Tests for ``allmydata.web.logs``. +""" + +from __future__ import ( + print_function, + unicode_literals, + absolute_import, + division, +) + +from testtools.matchers import ( + Equals, +) +from testtools.twistedsupport import ( + succeeded, +) + +from twisted.web.http import ( + OK, +) + +from treq.client import ( + HTTPClient, +) +from treq.testing import ( + RequestTraversalAgent, +) + +from .matchers import ( + has_response_code, +) + +from ..common import ( + SyncTestCase, +) + +from ...web.logs import ( + create_log_resources, +) + +class StreamingEliotLogsTests(SyncTestCase): + """ + Tests for the log streaming resources created by ``create_log_resources``. + """ + def setUp(self): + self.resource = create_log_resources() + self.agent = RequestTraversalAgent(self.resource) + self.client = HTTPClient(self.agent) + return super(StreamingEliotLogsTests, self).setUp() + + def test_v1(self): + """ + There is a resource at *v1*. + """ + self.assertThat( + self.client.get(b"http:///v1"), + succeeded(has_response_code(Equals(OK))), + ) diff --git a/src/allmydata/test/web/test_private.py b/src/allmydata/test/web/test_private.py new file mode 100644 index 000000000..27ddbcf78 --- /dev/null +++ b/src/allmydata/test/web/test_private.py @@ -0,0 +1,110 @@ +""" +Tests for ``allmydata.web.private``. +""" + +from __future__ import ( + print_function, + unicode_literals, + absolute_import, + division, +) + +from testtools.matchers import ( + Equals, +) +from testtools.twistedsupport import ( + succeeded, +) + +from twisted.web.http import ( + UNAUTHORIZED, + NOT_FOUND, +) +from twisted.web.http_headers import ( + Headers, +) + +from treq.client import ( + HTTPClient, +) +from treq.testing import ( + RequestTraversalAgent, +) + +from ..common import ( + SyncTestCase, +) + +from ...web.private import ( + SCHEME, + create_private_tree, +) + +from .matchers import ( + has_response_code, +) + +class PrivacyTests(SyncTestCase): + """ + Tests for the privacy features of the resources created by ``create_private_tree``. + """ + def setUp(self): + self.token = b"abcdef" + self.resource = create_private_tree(lambda: self.token) + self.agent = RequestTraversalAgent(self.resource) + self.client = HTTPClient(self.agent) + return super(PrivacyTests, self).setUp() + + def _authorization(self, scheme, value): + return Headers({ + u"authorization": [u"{} {}".format(scheme, value)], + }) + + def test_unauthorized(self): + """ + A request without an *Authorization* header receives an *Unauthorized* response. + """ + self.assertThat( + self.client.head(b"http:///foo/bar"), + succeeded(has_response_code(Equals(UNAUTHORIZED))), + ) + + def test_wrong_scheme(self): + """ + A request with an *Authorization* header not containing the Tahoe-LAFS + scheme receives an *Unauthorized* response. + """ + self.assertThat( + self.client.head( + b"http:///foo/bar", + headers=self._authorization(u"basic", self.token), + ), + succeeded(has_response_code(Equals(UNAUTHORIZED))), + ) + + def test_wrong_token(self): + """ + A request with an *Authorization* header not containing the expected token + receives an *Unauthorized* response. + """ + self.assertThat( + self.client.head( + b"http:///foo/bar", + headers=self._authorization(SCHEME, u"foo bar"), + ), + succeeded(has_response_code(Equals(UNAUTHORIZED))), + ) + + def test_authorized(self): + """ + A request with an *Authorization* header containing the expected scheme + and token does not receive an *Unauthorized* response. + """ + self.assertThat( + self.client.head( + b"http:///foo/bar", + headers=self._authorization(SCHEME, self.token), + ), + # It's a made up URL so we don't get a 200, either, but a 404. + succeeded(has_response_code(Equals(NOT_FOUND))), + ) diff --git a/src/allmydata/web/_autobahn_1151.py b/src/allmydata/web/_autobahn_1151.py new file mode 100644 index 000000000..837656cf2 --- /dev/null +++ b/src/allmydata/web/_autobahn_1151.py @@ -0,0 +1,27 @@ +""" +Implement a work-around for . +""" + + +from __future__ import ( + print_function, + unicode_literals, + absolute_import, + division, +) + + +from autobahn.websocket.protocol import WebSocketProtocol +_originalConnectionLost = WebSocketProtocol._connectionLost + +def _connectionLost(self, reason): + if self.openHandshakeTimeoutCall is not None: + self.openHandshakeTimeoutCall.cancel() + self.openHandshakeTimeoutCall = None + return _originalConnectionLost(self, reason) + +def patch(): + """ + Monkey-patch the proposed fix into place. + """ + WebSocketProtocol._connectionLost = _connectionLost diff --git a/src/allmydata/web/_nevow_106.py b/src/allmydata/web/_nevow_106.py new file mode 100644 index 000000000..3379767a5 --- /dev/null +++ b/src/allmydata/web/_nevow_106.py @@ -0,0 +1,31 @@ +""" +Implement a work-around for . +""" + +from __future__ import ( + print_function, + unicode_literals, + absolute_import, + division, +) + +from nevow import inevow +from twisted.internet import defer + +def renderHTTP(self, ctx): + request = inevow.IRequest(ctx) + if self.real_prepath_len is not None: + request.postpath = request.prepath + request.postpath + request.prepath = request.postpath[:self.real_prepath_len] + del request.postpath[:self.real_prepath_len] + result = defer.maybeDeferred(self.original.render, request).addCallback( + self._handle_NOT_DONE_YET, request) + return result + + +def patch(): + """ + Monkey-patch the proposed fix into place. + """ + from nevow.appserver import OldResourceAdapter + OldResourceAdapter.renderHTTP = renderHTTP diff --git a/src/allmydata/web/logs.py b/src/allmydata/web/logs.py new file mode 100644 index 000000000..0ba8b17e9 --- /dev/null +++ b/src/allmydata/web/logs.py @@ -0,0 +1,79 @@ +from __future__ import ( + print_function, + unicode_literals, + absolute_import, + division, +) + +import json + +from autobahn.twisted.resource import WebSocketResource +from autobahn.twisted.websocket import ( + WebSocketServerFactory, + WebSocketServerProtocol, +) +import eliot + +from twisted.web.resource import ( + Resource, +) + +# Hotfix work-around https://github.com/crossbario/autobahn-python/issues/1151 +from . import _autobahn_1151 +_autobahn_1151.patch() +del _autobahn_1151 + + +class TokenAuthenticatedWebSocketServerProtocol(WebSocketServerProtocol): + """ + A WebSocket protocol that looks for an `Authorization:` header + with a `tahoe-lafs` scheme and a token matching our private config + for `api_auth_token`. + """ + + def onConnect(self, req): + """ + WebSocket callback + """ + # we don't care what WebSocket sub-protocol is + # negotiated, nor do we need to send headers to the + # client, so we ask Autobahn to just allow this + # connection with the defaults. We could return a + # (headers, protocol) pair here instead if required. + return None + + def _received_eliot_log(self, message): + """ + While this WebSocket connection is open, this function is + registered as an eliot destination + """ + # probably want a try/except around here? what do we do if + # transmission fails or anything else bad happens? + self.sendMessage(json.dumps(message)) + + def onOpen(self): + """ + WebSocket callback + """ + eliot.add_destination(self._received_eliot_log) + + def onClose(self, wasClean, code, reason): + """ + WebSocket callback + """ + try: + eliot.remove_destination(self._received_eliot_log) + except ValueError: + pass + + +def create_log_streaming_resource(): + factory = WebSocketServerFactory() + factory.protocol = TokenAuthenticatedWebSocketServerProtocol + return WebSocketResource(factory) + + +def create_log_resources(): + logs = Resource() + logs.putChild(b"v1", create_log_streaming_resource()) + return logs diff --git a/src/allmydata/web/private.py b/src/allmydata/web/private.py new file mode 100644 index 000000000..fdf67a64f --- /dev/null +++ b/src/allmydata/web/private.py @@ -0,0 +1,145 @@ + +from __future__ import ( + print_function, + unicode_literals, + absolute_import, + division, +) + +import attr + +from zope.interface import ( + implementer, +) + +from twisted.python.failure import ( + Failure, +) +from twisted.internet.defer import ( + succeed, + fail, +) +from twisted.cred.credentials import ( + ICredentials, +) +from twisted.cred.portal import ( + IRealm, + Portal, +) +from twisted.cred.checkers import ( + ANONYMOUS, +) +from twisted.cred.error import ( + UnauthorizedLogin, +) +from twisted.web.iweb import ( + ICredentialFactory, +) +from twisted.web.resource import ( + IResource, + Resource, +) +from twisted.web.guard import ( + HTTPAuthSessionWrapper, +) + +from ..util.hashutil import ( + timing_safe_compare, +) +from ..util.assertutil import ( + precondition, +) + +from .logs import ( + create_log_resources, +) + +# Hotfix work-around https://github.com/twisted/nevow/issues/106 +from . import _nevow_106 +_nevow_106.patch() +del _nevow_106 + +SCHEME = b"tahoe-lafs" + +class IToken(ICredentials): + def check(auth_token): + pass + + +@implementer(IToken) +@attr.s +class Token(object): + proposed_token = attr.ib(type=bytes) + + def equals(self, valid_token): + return timing_safe_compare( + valid_token, + self.proposed_token, + ) + + +@attr.s +class TokenChecker(object): + get_auth_token = attr.ib() + + credentialInterfaces = [IToken] + + def requestAvatarId(self, credentials): + required_token = self.get_auth_token() + precondition(isinstance(required_token, bytes)) + if credentials.equals(required_token): + return succeed(ANONYMOUS) + return fail(Failure(UnauthorizedLogin())) + + +@implementer(ICredentialFactory) +@attr.s +class TokenCredentialFactory(object): + scheme = SCHEME + authentication_realm = b"tahoe-lafs" + + def getChallenge(self, request): + return {b"realm": self.authentication_realm} + + def decode(self, response, request): + return Token(response) + + +@implementer(IRealm) +@attr.s +class PrivateRealm(object): + _root = attr.ib() + + def _logout(self): + pass + + def requestAvatar(self, avatarId, mind, *interfaces): + if IResource in interfaces: + return (IResource, self._root, self._logout) + raise NotImplementedError( + "PrivateRealm supports IResource not {}".format(interfaces), + ) + + +def _create_vulnerable_tree(): + private = Resource() + private.putChild(b"logs", create_log_resources()) + return private + + +def _create_private_tree(get_auth_token, vulnerable): + realm = PrivateRealm(vulnerable) + portal = Portal(realm, [TokenChecker(get_auth_token)]) + return HTTPAuthSessionWrapper(portal, [TokenCredentialFactory()]) + + +def create_private_tree(get_auth_token): + """ + Create a new resource tree that only allows requests if they include a + correct `Authorization: tahoe-lafs ` header (where + `api_auth_token` matches the private configuration value). + """ + return _create_private_tree( + get_auth_token, + _create_vulnerable_tree(), + ) diff --git a/src/allmydata/web/root.py b/src/allmydata/web/root.py index ee8e1558b..4486c9530 100644 --- a/src/allmydata/web/root.py +++ b/src/allmydata/web/root.py @@ -25,7 +25,9 @@ from allmydata.web.common import ( render_time, render_time_attr, ) - +from allmydata.web.private import ( + create_private_tree, +) class URIHandler(RenderMixin, rend.Page): # I live at /uri . There are several operations defined on /uri itself, @@ -137,6 +139,7 @@ class IncidentReporter(RenderMixin, rend.Page): SPACE = u"\u00A0"*2 + class Root(MultiFormatPage): addSlash = True @@ -167,6 +170,11 @@ class Root(MultiFormatPage): # handler for "/magic_folder" URIs self.child_magic_folder = magic_folder.MagicFolderWebApi(client) + # Handler for everything beneath "/private", an area of the resource + # hierarchy which is only accessible with the private per-node API + # auth token. + self.child_private = create_private_tree(client.get_auth_token) + self.child_file = FileHandler(client) self.child_named = FileHandler(client) self.child_status = status.Status(client.get_history()) diff --git a/src/allmydata/webish.py b/src/allmydata/webish.py index bd404bbd4..c661e1886 100644 --- a/src/allmydata/webish.py +++ b/src/allmydata/webish.py @@ -161,6 +161,7 @@ class WebishServer(service.MultiService): # twisted.internet.task.Clock that is provided by the unit tests # so that they can test features that involve the passage of # time in a deterministic manner. + self.root = root.Root(client, clock, now_fn) self.buildServer(webport, nodeurl_path, staticdir) if self.root.child_operations: diff --git a/ws_client.py b/ws_client.py new file mode 100644 index 000000000..15d717d42 --- /dev/null +++ b/ws_client.py @@ -0,0 +1,87 @@ +from __future__ import print_function + +import sys +import json + +from twisted.internet.error import ConnectError +from twisted.internet.task import react +from twisted.internet.defer import inlineCallbacks, Deferred +from twisted.internet.endpoints import HostnameEndpoint + +from autobahn.twisted.websocket import ( + WebSocketClientProtocol, + WebSocketClientFactory, +) + +from allmydata.client import read_config + + +class TahoeLogProtocol(WebSocketClientProtocol): + """ + """ + + def onOpen(self): + self.factory.on_open.callback(self) + + def onMessage(self, payload, isBinary): + if False: + log_data = json.loads(payload.decode('utf8')) + print("eliot message:") + for k, v in log_data.items(): + print(" {}: {}".format(k, v)) + else: + print(payload) + sys.stdout.flush() + + def onClose(self, *args): + if not self.factory.on_open.called: + self.factory.on_open.errback( + RuntimeError("Failed: {}".format(args)) + ) + self.factory.on_close.callback(self) + + +@inlineCallbacks +def main(reactor): + + from twisted.python import log + log.startLogging(sys.stdout) + + tahoe_dir = "testgrid/alice" + cfg = read_config(tahoe_dir, "portnum") + + token = cfg.get_private_config("api_auth_token").strip() + webport = cfg.get_config("node", "web.port") + if webport.startswith("tcp:"): + port = webport.split(':')[1] + else: + port = webport + + factory = WebSocketClientFactory( + url=u"ws://127.0.0.1:{}/private/logs/v1".format(port), + headers={ + "Authorization": "tahoe-lafs {}".format(token), + } + ) + factory.on_open = Deferred() + factory.on_close = Deferred() + + factory.protocol = TahoeLogProtocol + + endpoint = HostnameEndpoint(reactor, "127.0.0.1", int(port)) + try: + port = yield endpoint.connect(factory) + except ConnectError as e: + print("Connection failed: {}".format(e)) + return + + print("port: {}".format(port)) + yield factory.on_open + print("opened") + yield factory.on_close + print("closed") + + + +if __name__ == '__main__': + react(main)