diff --git a/src/_zkapauthorizer/private.py b/src/_zkapauthorizer/private.py new file mode 100644 index 0000000000000000000000000000000000000000..baf837206b53e067be86446bc956023908a303e1 --- /dev/null +++ b/src/_zkapauthorizer/private.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- +# Tahoe-LAFS -- secure, distributed storage grid +# +# Copyright © 2020 The Tahoe-LAFS Software Foundation +# +# Copyright 2019 PrivateStorage.io, LLC + +""" +Support code for applying token-based HTTP authorization rules to a +Twisted Web resource hierarchy. +""" + +from __future__ import ( + print_function, + unicode_literals, + absolute_import, + division, +) + +import attr + +from zope.interface import ( + implementer, +) + +from twisted.python.failure import ( + Failure, +) +from twisted.internet.defer import ( + succeed, + fail, +) +from twisted.cred.credentials import ( + ICredentials, +) +from twisted.cred.portal import ( + IRealm, + Portal, +) +from twisted.cred.checkers import ( + ANONYMOUS, +) +from twisted.cred.error import ( + UnauthorizedLogin, +) +from twisted.web.iweb import ( + ICredentialFactory, +) +from twisted.web.resource import ( + IResource, +) +from twisted.web.guard import ( + HTTPAuthSessionWrapper, +) + +from cryptography.hazmat.primitives.constant_time import ( + bytes_eq, +) + +# https://github.com/twisted/nevow/issues/106 may affect this code but if so +# then the hotfix Tahoe-LAFS applies should deal with it. +# +# We want to avoid depending on the Tahoe-LAFS Python API since it isn't +# public but we do want to make sure that hotfix is applied. This seems like +# an alright compromise. +import allmydata.web.private as awp +del awp + +SCHEME = b"tahoe-lafs" + +class IToken(ICredentials): + def check(auth_token): + pass + + +@implementer(IToken) +@attr.s +class Token(object): + proposed_token = attr.ib(type=bytes) + + def equals(self, valid_token): + return bytes_eq( + valid_token, + self.proposed_token, + ) + + +@attr.s +class TokenChecker(object): + get_auth_token = attr.ib() + + credentialInterfaces = [IToken] + + def requestAvatarId(self, credentials): + required_token = self.get_auth_token() + if credentials.equals(required_token): + return succeed(ANONYMOUS) + return fail(Failure(UnauthorizedLogin())) + + +@implementer(ICredentialFactory) +@attr.s +class TokenCredentialFactory(object): + scheme = SCHEME + authentication_realm = b"tahoe-lafs" + + def getChallenge(self, request): + return {b"realm": self.authentication_realm} + + def decode(self, response, request): + return Token(response) + + +@implementer(IRealm) +@attr.s +class PrivateRealm(object): + _root = attr.ib() + + def _logout(self): + pass + + def requestAvatar(self, avatarId, mind, *interfaces): + if IResource in interfaces: + return (IResource, self._root, self._logout) + raise NotImplementedError( + "PrivateRealm supports IResource not {}".format(interfaces), + ) + + +def _create_private_tree(get_auth_token, vulnerable): + realm = PrivateRealm(vulnerable) + portal = Portal(realm, [TokenChecker(get_auth_token)]) + return HTTPAuthSessionWrapper(portal, [TokenCredentialFactory()]) + + +def create_private_tree(get_auth_token, vulnerable_tree): + """ + Create a new resource tree that only allows requests if they include a + correct `Authorization: tahoe-lafs <api_auth_token>` header (where + `api_auth_token` matches the private configuration value). + + :param (IO -> bytes) get_auth_token: Get the valid authorization token. + + :param IResource vulnerable_tree: Create the resource + hierarchy which will be protected by the authorization mechanism. + """ + return _create_private_tree( + get_auth_token, + vulnerable_tree, + ) diff --git a/src/_zkapauthorizer/tests/test_private.py b/src/_zkapauthorizer/tests/test_private.py new file mode 100644 index 0000000000000000000000000000000000000000..9382eb54021adea9a893b1b412c065398a4f0704 --- /dev/null +++ b/src/_zkapauthorizer/tests/test_private.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +# Tahoe-LAFS -- secure, distributed storage grid +# +# Copyright © 2020 The Tahoe-LAFS Software Foundation +# +# Copyright 2019 PrivateStorage.io, LLC + +""" +Tests for ``_zkapauthorizer.private``. +""" + +from __future__ import ( + print_function, + unicode_literals, + absolute_import, + division, +) + +from testtools import ( + TestCase, +) +from testtools.matchers import ( + Equals, +) +from testtools.twistedsupport import ( + succeeded, +) + +from twisted.web.http import ( + UNAUTHORIZED, + NOT_FOUND, +) +from twisted.web.http_headers import ( + Headers, +) +from twisted.web.resource import ( + Resource, +) + +from treq.client import ( + HTTPClient, +) +from treq.testing import ( + RequestTraversalAgent, +) + +from ..private import ( + SCHEME, + create_private_tree, +) + +from allmydata.test.web.matchers import ( + has_response_code, +) + +class PrivacyTests(TestCase): + """ + Tests for the privacy features of the resources created by ``create_private_tree``. + """ + def setUp(self): + self.token = b"abcdef" + self.resource = create_private_tree(lambda: self.token, Resource()) + self.agent = RequestTraversalAgent(self.resource) + self.client = HTTPClient(self.agent) + return super(PrivacyTests, self).setUp() + + def _authorization(self, scheme, value): + return Headers({ + u"authorization": [u"{} {}".format(scheme, value)], + }) + + def test_unauthorized(self): + """ + A request without an *Authorization* header receives an *Unauthorized* response. + """ + self.assertThat( + self.client.head(b"http:///foo/bar"), + succeeded(has_response_code(Equals(UNAUTHORIZED))), + ) + + def test_wrong_scheme(self): + """ + A request with an *Authorization* header not containing the Tahoe-LAFS + scheme receives an *Unauthorized* response. + """ + self.assertThat( + self.client.head( + b"http:///foo/bar", + headers=self._authorization(u"basic", self.token), + ), + succeeded(has_response_code(Equals(UNAUTHORIZED))), + ) + + def test_wrong_token(self): + """ + A request with an *Authorization* header not containing the expected token + receives an *Unauthorized* response. + """ + self.assertThat( + self.client.head( + b"http:///foo/bar", + headers=self._authorization(SCHEME, u"foo bar"), + ), + succeeded(has_response_code(Equals(UNAUTHORIZED))), + ) + + def test_authorized(self): + """ + A request with an *Authorization* header containing the expected scheme + and token does not receive an *Unauthorized* response. + """ + self.assertThat( + self.client.head( + b"http:///foo/bar", + headers=self._authorization(SCHEME, self.token), + ), + # It's a made up URL so we don't get a 200, either, but a 404. + succeeded(has_response_code(Equals(NOT_FOUND))), + )