tahoe-lafs/src/allmydata/web/private.py

145 lines
3.0 KiB
Python
Raw Normal View History

2019-03-21 19:00:57 +00:00
from __future__ import (
print_function,
unicode_literals,
absolute_import,
division,
)
import attr
from zope.interface import (
implementer,
)
from twisted.python.failure import (
Failure,
)
from twisted.internet.defer import (
succeed,
fail,
)
from twisted.cred.credentials import (
ICredentials,
)
from twisted.cred.portal import (
IRealm,
Portal,
)
from twisted.cred.checkers import (
ANONYMOUS,
)
from twisted.cred.error import (
UnauthorizedLogin,
)
from twisted.web.iweb import (
ICredentialFactory,
)
2019-03-21 19:00:57 +00:00
from twisted.web.resource import (
IResource,
2019-03-21 19:00:57 +00:00
Resource,
)
from twisted.web.guard import (
HTTPAuthSessionWrapper,
)
from ..util.hashutil import (
timing_safe_compare,
)
2019-03-22 20:42:50 +00:00
from ..util.assertutil import (
precondition,
)
2019-03-21 19:00:57 +00:00
from .logs import (
create_log_resources,
)
SCHEME = b"tahoe-lafs"
class IToken(ICredentials):
def check(auth_token):
pass
# Shoobx/mypy-zope#26
_itoken_impl = implementer(IToken)
@_itoken_impl
@attr.s
class Token(object):
proposed_token = attr.ib(type=bytes)
def equals(self, valid_token):
return timing_safe_compare(
valid_token,
self.proposed_token,
)
@attr.s
class TokenChecker(object):
get_auth_token = attr.ib()
credentialInterfaces = [IToken]
def requestAvatarId(self, credentials):
2019-03-22 20:42:50 +00:00
required_token = self.get_auth_token()
precondition(isinstance(required_token, bytes))
if credentials.equals(required_token):
return succeed(ANONYMOUS)
return fail(Failure(UnauthorizedLogin()))
@implementer(ICredentialFactory)
@attr.s
class TokenCredentialFactory(object):
scheme = SCHEME
authentication_realm = b"tahoe-lafs"
def getChallenge(self, request):
return {b"realm": self.authentication_realm}
def decode(self, response, request):
return Token(response)
@implementer(IRealm)
@attr.s
class PrivateRealm(object):
_root = attr.ib()
def _logout(self):
pass
def requestAvatar(self, avatarId, mind, *interfaces):
if IResource in interfaces:
return (IResource, self._root, self._logout)
raise NotImplementedError(
"PrivateRealm supports IResource not {}".format(interfaces),
)
def _create_vulnerable_tree():
2019-03-21 19:00:57 +00:00
private = Resource()
private.putChild(b"logs", create_log_resources())
2019-03-21 19:00:57 +00:00
return private
def _create_private_tree(get_auth_token, vulnerable):
realm = PrivateRealm(vulnerable)
portal = Portal(realm, [TokenChecker(get_auth_token)])
return HTTPAuthSessionWrapper(portal, [TokenCredentialFactory()])
def create_private_tree(get_auth_token):
"""
Create a new resource tree that only allows requests if they include a
correct `Authorization: tahoe-lafs <api_auth_token>` header (where
`api_auth_token` matches the private configuration value).
"""
return _create_private_tree(
get_auth_token,
_create_vulnerable_tree(),
)