Skip to content
Snippets Groups Projects
private.py 3.46 KiB
Newer Older
  • Learn to ignore specific revisions
  • # -*- coding: utf-8 -*-
    # Tahoe-LAFS -- secure, distributed storage grid
    #
    # Copyright © 2020 The Tahoe-LAFS Software Foundation
    #
    # Copyright 2019 PrivateStorage.io, LLC
    
    """
    Support code for applying token-based HTTP authorization rules to a
    Twisted Web resource hierarchy.
    """
    
    from __future__ import (
        print_function,
        unicode_literals,
        absolute_import,
        division,
    )
    
    import attr
    
    from zope.interface import (
        implementer,
    )
    
    from twisted.python.failure import (
        Failure,
    )
    from twisted.internet.defer import (
        succeed,
        fail,
    )
    from twisted.cred.credentials import (
        ICredentials,
    )
    from twisted.cred.portal import (
        IRealm,
        Portal,
    )
    from twisted.cred.checkers import (
        ANONYMOUS,
    )
    from twisted.cred.error import (
        UnauthorizedLogin,
    )
    from twisted.web.iweb import (
        ICredentialFactory,
    )
    from twisted.web.resource import (
        IResource,
    )
    from twisted.web.guard import (
        HTTPAuthSessionWrapper,
    )
    
    from cryptography.hazmat.primitives.constant_time import (
        bytes_eq,
    )
    
    # https://github.com/twisted/nevow/issues/106 may affect this code but if so
    # then the hotfix Tahoe-LAFS applies should deal with it.
    #
    # We want to avoid depending on the Tahoe-LAFS Python API since it isn't
    # public but we do want to make sure that hotfix is applied.  This seems like
    # an alright compromise.
    import allmydata.web.private as awp
    
    Tom Prince's avatar
    Tom Prince committed
    
    
    Tom Prince's avatar
    Tom Prince committed
    
    
    class IToken(ICredentials):
        def check(auth_token):
            pass
    
    
    @implementer(IToken)
    @attr.s
    class Token(object):
        proposed_token = attr.ib(type=bytes)
    
        def equals(self, valid_token):
            return bytes_eq(
                valid_token,
                self.proposed_token,
            )
    
    
    @attr.s
    class TokenChecker(object):
        get_auth_token = attr.ib()
    
        credentialInterfaces = [IToken]
    
        def requestAvatarId(self, credentials):
            required_token = self.get_auth_token()
            if credentials.equals(required_token):
                return succeed(ANONYMOUS)
            return fail(Failure(UnauthorizedLogin()))
    
    
    @implementer(ICredentialFactory)
    @attr.s
    class TokenCredentialFactory(object):
        scheme = SCHEME
        authentication_realm = b"tahoe-lafs"
    
        def getChallenge(self, request):
            return {b"realm": self.authentication_realm}
    
        def decode(self, response, request):
            return Token(response)
    
    
    @implementer(IRealm)
    @attr.s
    class PrivateRealm(object):
        _root = attr.ib()
    
        def _logout(self):
            pass
    
        def requestAvatar(self, avatarId, mind, *interfaces):
            if IResource in interfaces:
                return (IResource, self._root, self._logout)
            raise NotImplementedError(
                "PrivateRealm supports IResource not {}".format(interfaces),
            )
    
    
    def _create_private_tree(get_auth_token, vulnerable):
        realm = PrivateRealm(vulnerable)
        portal = Portal(realm, [TokenChecker(get_auth_token)])
        return HTTPAuthSessionWrapper(portal, [TokenCredentialFactory()])
    
    
    def create_private_tree(get_auth_token, vulnerable_tree):
        """
        Create a new resource tree that only allows requests if they include a
        correct `Authorization: tahoe-lafs <api_auth_token>` header (where
        `api_auth_token` matches the private configuration value).
    
        :param (IO -> bytes) get_auth_token: Get the valid authorization token.
    
        :param IResource vulnerable_tree: Create the resource
            hierarchy which will be protected by the authorization mechanism.
        """
        return _create_private_tree(
            get_auth_token,
            vulnerable_tree,
        )