Newer
Older
Jean-Paul Calderone
committed
# -*- coding: utf-8 -*-
# Tahoe-LAFS -- secure, distributed storage grid
#
# Copyright © 2020 The Tahoe-LAFS Software Foundation
#
# Copyright 2019 PrivateStorage.io, LLC
"""
Support code for applying token-based HTTP authorization rules to a
Twisted Web resource hierarchy.
"""
# https://github.com/twisted/nevow/issues/106 may affect this code but if so
# then the hotfix Tahoe-LAFS applies should deal with it.
#
# We want to avoid depending on the Tahoe-LAFS Python API since it isn't
# public but we do want to make sure that hotfix is applied. This seems like
# an alright compromise.
import allmydata.web.private as awp
import attr
from cryptography.hazmat.primitives.constant_time import bytes_eq
from twisted.cred.checkers import ANONYMOUS
from twisted.cred.credentials import ICredentials
from twisted.cred.error import UnauthorizedLogin
from twisted.cred.portal import IRealm, Portal
from twisted.internet.defer import fail, succeed
from twisted.python.failure import Failure
from twisted.web.guard import HTTPAuthSessionWrapper
from twisted.web.iweb import ICredentialFactory
from twisted.web.resource import IResource
from zope.interface import implementer
Jean-Paul Calderone
committed
del awp
SCHEME = b"tahoe-lafs"
Jean-Paul Calderone
committed
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class IToken(ICredentials):
def check(auth_token):
pass
@implementer(IToken)
@attr.s
class Token(object):
proposed_token = attr.ib(type=bytes)
def equals(self, valid_token):
return bytes_eq(
valid_token,
self.proposed_token,
)
@attr.s
class TokenChecker(object):
get_auth_token = attr.ib()
credentialInterfaces = [IToken]
def requestAvatarId(self, credentials):
required_token = self.get_auth_token()
if credentials.equals(required_token):
return succeed(ANONYMOUS)
return fail(Failure(UnauthorizedLogin()))
@implementer(ICredentialFactory)
@attr.s
class TokenCredentialFactory(object):
scheme = SCHEME
authentication_realm = b"tahoe-lafs"
def getChallenge(self, request):
return {b"realm": self.authentication_realm}
def decode(self, response, request):
return Token(response)
@implementer(IRealm)
@attr.s
class PrivateRealm(object):
_root = attr.ib()
def _logout(self):
pass
def requestAvatar(self, avatarId, mind, *interfaces):
if IResource in interfaces:
return (IResource, self._root, self._logout)
raise NotImplementedError(
"PrivateRealm supports IResource not {}".format(interfaces),
)
def _create_private_tree(get_auth_token, vulnerable):
realm = PrivateRealm(vulnerable)
portal = Portal(realm, [TokenChecker(get_auth_token)])
return HTTPAuthSessionWrapper(portal, [TokenCredentialFactory()])
def create_private_tree(get_auth_token, vulnerable_tree):
"""
Create a new resource tree that only allows requests if they include a
correct `Authorization: tahoe-lafs <api_auth_token>` header (where
`api_auth_token` matches the private configuration value).
:param (IO -> bytes) get_auth_token: Get the valid authorization token.
:param IResource vulnerable_tree: Create the resource
hierarchy which will be protected by the authorization mechanism.
"""
return _create_private_tree(
get_auth_token,
vulnerable_tree,
)