Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# Copyright 2019 PrivateStorage.io, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module implements a service which periodically spends ZKAPs to
refresh leases on all shares reachable from a root.
"""
from functools import (
partial,
)
from datetime import (
datetime,
timedelta,
)
import attr
from twisted.internet.defer import (
inlineCallbacks,
maybeDeferred,
)
from twisted.application.service import (
Service,
)
from twisted.python.log import (
err,
)
from allmydata.interfaces import (
IDirectoryNode,
)
from allmydata.util.hashutil import (
file_renewal_secret_hash,
bucket_renewal_secret_hash,
)
@inlineCallbacks
def visit_storage_indexes(root_node, visit):
"""
Call a visitor with the storage index of ``root_node`` and that of all
nodes reachable from it.
:param IFilesystemNode root_node: The node from which to start.
:param visit: A one-argument callable. It will be called with the storage
index of all visited nodes.
:return Deferred: A Deferred which fires after all nodes have been
visited.
"""
stack = [root_node]
while stack:
elem = stack.pop()
visit(elem.get_storage_index())
if IDirectoryNode.providedBy(elem):
children = yield elem.list()
# Produce consistent results by forcing some consistent ordering
# here. This will sort by name.
stable_children = sorted(children.items())
for (name, (child_node, child_metadata)) in stable_children:
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
stack.append(child_node)
def iter_storage_indexes(visit_assets):
"""
Get an iterator over storage indexes of all nodes visited by
``visit_assets``.
:param visit_assets: A one-argument function which takes a visit function
and calls it with all nodes to visit.
:return Deferred[list[bytes]]: A Deferred that fires with a list of
storage indexes from the visited nodes. The list is in an arbitrary
order and does not include duplicates if any nodes were visited more
than once.
"""
storage_indexes = set()
visit = storage_indexes.add
d = visit_assets(visit)
# Create some order now that we've ensured they're unique.
d.addCallback(lambda ignored: list(storage_indexes))
return d
@inlineCallbacks
def renew_leases(visit_assets, storage_broker, secret_holder, min_lease_remaining, now):
"""
Check the leases on a group of nodes for those which are expired or close
to expiring and renew such leases.
:param visit_assets: A one-argument callable which takes a visitor
function and calls it with the storage index of every node to check.
:param StorageFarmBroker storage_broker: A storage broker which can supply
the storage servers where the nodes should be checked.
:param SecretHolder secret_holder: The source of the renew secret for any
leases which require renewal.
:param timedelta min_lease_remaining: The minimum amount of time remaining
to allow on a lease without renewing it.
:param now: A no-argument function returning the current time, as a
datetime instance, for comparison against lease expiration time.
:return Deferred: A Deferred which fires when all visitable nodes have
been checked and any leases renewed which required it.
"""
storage_indexes = yield iter_storage_indexes(visit_assets)
renewal_secret = secret_holder.get_renewal_secret()
servers = storage_broker.get_connected_servers()
for server in servers:
# Consider parallelizing this.
yield renew_leases_on_server(
min_lease_remaining,
renewal_secret,
storage_indexes,
server,
now(),
)
@inlineCallbacks
def renew_leases_on_server(
min_lease_remaining,
renewal_secret,
storage_indexes,
server,
now,
):
"""
Check leases on the shares for the given storage indexes on the given
storage server for those which are expired or close to expiring and renew
such leases.
:param timedelta min_lease_remaining: The minimum amount of time remaining
to allow on a lease without renewing it.
:param renewal_secret: A seed for the renewal secret hash calculation for
any leases which need to be renewed.
:param list[bytes] storage_indexes: The storage indexes to check.
:param StorageServer server: The storage server on which to check.
:param datetime now: The current time for comparison against the least
expiration time.
:return Deferred: A Deferred which fires after all storage indexes have
been checked and any leases that need renewal have been renewed.
"""
stats = yield server.stat_shares(storage_indexes)
for storage_index, stat in zip(storage_indexes, stats):
if needs_lease_renew(min_lease_remaining, stat, now):
yield renew_lease(renewal_secret, storage_index, server)
def renew_lease(renewal_secret, storage_index, server):
"""
Renew the lease on the shares in one storage index on one server.
:param renewal_secret: A seed for the renewal secret hash calculation for
any leases which need to be renewed.
:param bytes storage_index: The storage index to operate on.
:param StorageServer server: The storage server to operate on.
:return Deferred: A Deferred that fires when the lease has been renewed.
"""
# See allmydata/immutable/checker.py, _get_renewal_secret
renew_secret = bucket_renewal_secret_hash(
file_renewal_secret_hash(
renewal_secret,
storage_index,
),
server.get_lease_seed(),
)
return server.renew_lease(
storage_index,
renew_secret,
)
def needs_lease_renew(min_lease_remaining, stat, now):
"""
Determine if a lease needs renewal.
:param timedelta min_lease_remaining: The minimum amount of time remaining
to allow on a lease without renewing it.
:param ShareStat stat: The metadata about a share to consider.
:param datetime now: The current time for comparison against the lease
expiration time.
:return bool: ``True`` if the lease needs to be renewed, ``False``
otherwise.
"""
remaining = datetime.utcfromtimestamp(stat.lease_expiration) - now
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
return remaining < min_lease_remaining
@attr.s
class _FuzzyTimerService(Service):
"""
A service to periodically, but not *too* periodically, run an operation.
:ivar operation: A no-argument callable to fuzzy-periodically run. It may
return a Deferred in which case the next run will not be scheduled
until the Deferred fires.
:ivar timedelta initial_interval: The amount of time to wait before the first
run of the operation.
:ivar sample_interval_distribution: A no-argument callable which returns a
number of seconds as a float giving the amount of time to wait before
the next run of the operation. It will be called each time the
operation completes.
:ivar IReactorTime reactor: A Twisted reactor to use to schedule runs of
the operation.
"""
operation = attr.ib()
initial_interval = attr.ib()
sample_interval_distribution = attr.ib()
reactor = attr.ib()
def startService(self):
Service.startService(self)
self.reactor.callLater(
self.initial_interval.total_seconds(),
self._iterate,
)
def _iterate(self):
"""
Run the operation once and then schedule it to run again.
"""
d = maybeDeferred(self.operation)
d.addErrback(err, "Fuzzy timer service ({})")
d.addCallback(lambda ignored: self._schedule())
def _schedule(self):
"""
Schedule the next run of the operation.
"""
self._call = self.reactor.callLater(
self.sample_interval_distribution().total_seconds(),
self._iterate,
)
def lease_maintenance_service(
interval_mean=None,
interval_range=None,
):
"""
Get an ``IService`` which will maintain leases on ``root_node`` and any
nodes directly or transitively reachable from it.
:param IReactorClock reactor: A Twisted reactor for scheduling renewal
activity.
:param datetime last_run: The time at which lease maintenance last ran to
inform an adjustment to the first interval before running it again, or
``None`` not to make such an adjustment.
:param random: An object like ``random.Random`` which can be used as a
source of scheduling delay.
:param timedelta interval_mean: The mean time between lease renewal checks.
:param timedelta interval_range: The range of the uniform distribution of
lease renewal checks (centered on ``interval_mean``).
:param maintain_leases: A no-argument callable which performs a round of
lease-maintenance. The resulting service calls this periodically.
if interval_mean is None:
interval_mean = timedelta(days=26)
if interval_range is None:
interval_range = timedelta(days=4)
halfrange = interval_range / 2
def sample_interval_distribution():
return timedelta(
seconds=random.uniform(
(interval_mean - halfrange).total_seconds(),
(interval_mean + halfrange).total_seconds(),
),
)
if last_run is None:
initial_interval = sample_interval_distribution()
else:
initial_interval = calculate_initial_interval(
sample_interval_distribution,
last_run,
datetime.utcfromtimestamp(reactor.seconds()),
)
initial_interval = max(
initial_interval,
timedelta(0),
)
initial_interval,
sample_interval_distribution,
reactor,
)
def visit_storage_indexes_from_root(visitor, root_node):
"""
An operation for ``lease_maintenance_service`` which applies the given
visitor to ``root_node`` and all its children.
:param visitor: A one-argument callable which takes the traversal function
and which should call it as desired.
:param IFilesystemNode root_node: The filesystem node at which traversal
will begin.
:return: A no-argument callable to perform the visits.
"""
return partial(
visitor,
partial(visit_storage_indexes, root_node),
)
def maintain_leases_from_root(root_node, storage_broker, secret_holder, min_lease_remaining, get_now):
"""
An operation for ``lease_maintenance_service`` which visits ``root_node``
and all its children and renews their leases if they have
``min_lease_remaining`` or less on them.
:param IFilesystemNode root_node: A Tahoe-LAFS filesystem node to use as
the root of a node hierarchy to be maintained.
:param StorageFarmBroker storage_broker: The storage broker which can put
us in touch with storage servers where shares of the nodes to maintain
might be found.
:param SecretHolder secret_holder: The Tahoe-LAFS client node secret
holder which can give us the lease renewal secrets needed to renew
leases.
:param timedelta min_lease_remaining: The minimum amount of time remaining
to allow on a lease without renewing it.
:param get_now: A no-argument callable that returns the current time as a
``datetime`` instance.
:return: A no-argument callable to perform the maintenance.
def visitor(visit_assets):
return renew_leases(
visit_assets,
storage_broker,
secret_holder,
min_lease_remaining,
get_now,
)
return visit_storage_indexes_from_root(
visitor,
root_node,
def calculate_initial_interval(sample_interval_distribution, last_run, now):
"""
Determine how long to wait before performing an initial (for this process)
scan for aging leases.
:param sample_interval_distribution: See ``_FuzzyTimerService``.
:param datetime last_run: The time of the last scan.
:param datetime now: The current time.
"""
since_last_run = now - last_run
initial_interval = sample_interval_distribution() - since_last_run
return initial_interval