Skip to content
Snippets Groups Projects
private.py 3.25 KiB
Newer Older
  • Learn to ignore specific revisions
  • # -*- coding: utf-8 -*-
    # Tahoe-LAFS -- secure, distributed storage grid
    #
    # Copyright © 2020 The Tahoe-LAFS Software Foundation
    #
    # Copyright 2019 PrivateStorage.io, LLC
    
    """
    Support code for applying token-based HTTP authorization rules to a
    Twisted Web resource hierarchy.
    """
    
    # https://github.com/twisted/nevow/issues/106 may affect this code but if so
    # then the hotfix Tahoe-LAFS applies should deal with it.
    #
    # We want to avoid depending on the Tahoe-LAFS Python API since it isn't
    # public but we do want to make sure that hotfix is applied.  This seems like
    # an alright compromise.
    import allmydata.web.private as awp
    
    Tom Prince's avatar
    Tom Prince committed
    import attr
    from cryptography.hazmat.primitives.constant_time import bytes_eq
    from twisted.cred.checkers import ANONYMOUS
    from twisted.cred.credentials import ICredentials
    from twisted.cred.error import UnauthorizedLogin
    from twisted.cred.portal import IRealm, Portal
    from twisted.internet.defer import fail, succeed
    from twisted.python.failure import Failure
    from twisted.web.guard import HTTPAuthSessionWrapper
    from twisted.web.iweb import ICredentialFactory
    from twisted.web.resource import IResource
    from zope.interface import implementer
    
    Tom Prince's avatar
    Tom Prince committed
    
    
    Tom Prince's avatar
    Tom Prince committed
    
    
    class IToken(ICredentials):
        def check(auth_token):
            pass
    
    
    @implementer(IToken)
    @attr.s
    class Token(object):
        proposed_token = attr.ib(type=bytes)
    
        def equals(self, valid_token):
            return bytes_eq(
                valid_token,
                self.proposed_token,
            )
    
    
    @attr.s
    class TokenChecker(object):
        get_auth_token = attr.ib()
    
        credentialInterfaces = [IToken]
    
        def requestAvatarId(self, credentials):
            required_token = self.get_auth_token()
            if credentials.equals(required_token):
                return succeed(ANONYMOUS)
            return fail(Failure(UnauthorizedLogin()))
    
    
    @implementer(ICredentialFactory)
    @attr.s
    class TokenCredentialFactory(object):
        scheme = SCHEME
        authentication_realm = b"tahoe-lafs"
    
        def getChallenge(self, request):
            return {b"realm": self.authentication_realm}
    
        def decode(self, response, request):
            return Token(response)
    
    
    @implementer(IRealm)
    @attr.s
    class PrivateRealm(object):
        _root = attr.ib()
    
        def _logout(self):
            pass
    
        def requestAvatar(self, avatarId, mind, *interfaces):
            if IResource in interfaces:
                return (IResource, self._root, self._logout)
            raise NotImplementedError(
                "PrivateRealm supports IResource not {}".format(interfaces),
            )
    
    
    def _create_private_tree(get_auth_token, vulnerable):
        realm = PrivateRealm(vulnerable)
        portal = Portal(realm, [TokenChecker(get_auth_token)])
        return HTTPAuthSessionWrapper(portal, [TokenCredentialFactory()])
    
    
    def create_private_tree(get_auth_token, vulnerable_tree):
        """
        Create a new resource tree that only allows requests if they include a
        correct `Authorization: tahoe-lafs <api_auth_token>` header (where
        `api_auth_token` matches the private configuration value).
    
        :param (IO -> bytes) get_auth_token: Get the valid authorization token.
    
        :param IResource vulnerable_tree: Create the resource
            hierarchy which will be protected by the authorization mechanism.
        """
        return _create_private_tree(
            get_auth_token,
            vulnerable_tree,
        )