Newer
Older
Jean-Paul Calderone
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# -*- coding: utf-8 -*-
# Tahoe-LAFS -- secure, distributed storage grid
#
# Copyright © 2020 The Tahoe-LAFS Software Foundation
#
# Copyright 2019 PrivateStorage.io, LLC
"""
Support code for applying token-based HTTP authorization rules to a
Twisted Web resource hierarchy.
"""
from __future__ import (
print_function,
unicode_literals,
absolute_import,
division,
)
import attr
from zope.interface import (
implementer,
)
from twisted.python.failure import (
Failure,
)
from twisted.internet.defer import (
succeed,
fail,
)
from twisted.cred.credentials import (
ICredentials,
)
from twisted.cred.portal import (
IRealm,
Portal,
)
from twisted.cred.checkers import (
ANONYMOUS,
)
from twisted.cred.error import (
UnauthorizedLogin,
)
from twisted.web.iweb import (
ICredentialFactory,
)
from twisted.web.resource import (
IResource,
)
from twisted.web.guard import (
HTTPAuthSessionWrapper,
)
from cryptography.hazmat.primitives.constant_time import (
bytes_eq,
)
# https://github.com/twisted/nevow/issues/106 may affect this code but if so
# then the hotfix Tahoe-LAFS applies should deal with it.
#
# We want to avoid depending on the Tahoe-LAFS Python API since it isn't
# public but we do want to make sure that hotfix is applied. This seems like
# an alright compromise.
import allmydata.web.private as awp
Jean-Paul Calderone
committed
del awp
SCHEME = b"tahoe-lafs"
Jean-Paul Calderone
committed
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class IToken(ICredentials):
def check(auth_token):
pass
@implementer(IToken)
@attr.s
class Token(object):
proposed_token = attr.ib(type=bytes)
def equals(self, valid_token):
return bytes_eq(
valid_token,
self.proposed_token,
)
@attr.s
class TokenChecker(object):
get_auth_token = attr.ib()
credentialInterfaces = [IToken]
def requestAvatarId(self, credentials):
required_token = self.get_auth_token()
if credentials.equals(required_token):
return succeed(ANONYMOUS)
return fail(Failure(UnauthorizedLogin()))
@implementer(ICredentialFactory)
@attr.s
class TokenCredentialFactory(object):
scheme = SCHEME
authentication_realm = b"tahoe-lafs"
def getChallenge(self, request):
return {b"realm": self.authentication_realm}
def decode(self, response, request):
return Token(response)
@implementer(IRealm)
@attr.s
class PrivateRealm(object):
_root = attr.ib()
def _logout(self):
pass
def requestAvatar(self, avatarId, mind, *interfaces):
if IResource in interfaces:
return (IResource, self._root, self._logout)
raise NotImplementedError(
"PrivateRealm supports IResource not {}".format(interfaces),
)
def _create_private_tree(get_auth_token, vulnerable):
realm = PrivateRealm(vulnerable)
portal = Portal(realm, [TokenChecker(get_auth_token)])
return HTTPAuthSessionWrapper(portal, [TokenCredentialFactory()])
def create_private_tree(get_auth_token, vulnerable_tree):
"""
Create a new resource tree that only allows requests if they include a
correct `Authorization: tahoe-lafs <api_auth_token>` header (where
`api_auth_token` matches the private configuration value).
:param (IO -> bytes) get_auth_token: Get the valid authorization token.
:param IResource vulnerable_tree: Create the resource
hierarchy which will be protected by the authorization mechanism.
"""
return _create_private_tree(
get_auth_token,
vulnerable_tree,
)