Newer
Older
# Copyright 2019 PrivateStorage.io, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module implements a service which periodically spends ZKAPs to
refresh leases on all shares reachable from a root.
"""
from datetime import datetime, timedelta
from errno import ENOENT
from functools import partial
import attr
from allmydata.interfaces import IDirectoryNode, IFilesystemNode
file_cancel_secret_hash,
Jean-Paul Calderone
committed
from isodate import duration_isoformat, parse_datetime, parse_duration
from twisted.application.service import Service
from twisted.internet.defer import inlineCallbacks, maybeDeferred
from twisted.python.log import err
from zope.interface import implementer
from .controller import bracket
from .model import ILeaseMaintenanceObserver
SERVICE_NAME = u"lease maintenance service"
def visit_storage_indexes(root_nodes, visit):
"""
Call a visitor with the storage index of ``root_node`` and that of all
nodes reachable from it.
:param IFilesystemNode root_node: The node from which to start.
:param visit: A one-argument callable. It will be called with the storage
index of all visited nodes.
:return Deferred: A Deferred which fires after all nodes have been
visited.
"""
if not isinstance(root_nodes, list):
raise TypeError(
"root_nodes must be a list, not {!r}".format(
root_nodes,
)
)
for node in root_nodes:
if not IFilesystemNode.providedBy(node):
raise TypeError(
"Root nodes must provide IFilesystemNode, {!r} does not".format(
node,
)
)
while stack:
elem = stack.pop()
visit(elem.get_storage_index())
if IDirectoryNode.providedBy(elem):
children = yield elem.list()
# Produce consistent results by forcing some consistent ordering
# here. This will sort by name.
stable_children = sorted(children.items())
for (name, (child_node, child_metadata)) in stable_children:
stack.append(child_node)
def iter_storage_indexes(visit_assets):
"""
Get an iterator over storage indexes of all nodes visited by
``visit_assets``.
:param visit_assets: A one-argument function which takes a visit function
and calls it with all nodes to visit.
:return Deferred[list[bytes]]: A Deferred that fires with a list of
storage indexes from the visited nodes. The list is in an arbitrary
order and does not include duplicates if any nodes were visited more
than once.
"""
storage_indexes = set()
visit = storage_indexes.add
d = visit_assets(visit)
# Create some order now that we've ensured they're unique.
d.addCallback(lambda ignored: list(storage_indexes))
return d
@inlineCallbacks
def renew_leases(
visit_assets,
storage_broker,
secret_holder,
min_lease_remaining,
get_activity_observer,
now,
"""
Check the leases on a group of nodes for those which are expired or close
to expiring and renew such leases.
:param visit_assets: A one-argument callable which takes a visitor
function and calls it with the storage index of every node to check.
:param StorageFarmBroker storage_broker: A storage broker which can supply
the storage servers where the nodes should be checked.
:param SecretHolder secret_holder: The source of the renew secret for any
leases which require renewal.
:param timedelta min_lease_remaining: The minimum amount of time remaining
to allow on a lease without renewing it.
:param get_activity_observer: A no-argument callable which returns an
``ILeaseMaintenanceObserver``.
:param now: A no-argument function returning the current time, as a
datetime instance, for comparison against lease expiration time.
:return Deferred: A Deferred which fires when all visitable nodes have
been checked and any leases renewed which required it.
"""
activity = get_activity_observer()
storage_indexes = yield iter_storage_indexes(visit_assets)
renewal_secret = secret_holder.get_renewal_secret()
cancel_secret = secret_holder.get_cancel_secret()
servers = list(
server.get_storage_server() for server in storage_broker.get_connected_servers()
for server in servers:
# Consider parallelizing this.
yield renew_leases_on_server(
min_lease_remaining,
renewal_secret,
activity.finish()
@inlineCallbacks
def renew_leases_on_server(
min_lease_remaining,
renewal_secret,
cancel_secret,
storage_indexes,
server,
activity,
now,
):
"""
Check leases on the shares for the given storage indexes on the given
storage server for those which are expired or close to expiring and renew
such leases.
:param timedelta min_lease_remaining: The minimum amount of time remaining
to allow on a lease without renewing it.
:param renewal_secret: See ``renew_lease``.
:param cancel_secret: See ``renew_lease``.
:param list[bytes] storage_indexes: The storage indexes to check.
:param StorageServer server: The storage server on which to check.
:param ILeaseMaintenanceObserver activity: An object which will receive
events allowing it to observe the lease maintenance activity.
:param datetime now: The current time for comparison against the least
expiration time.
:return Deferred: A Deferred which fires after all storage indexes have
been checked and any leases that need renewal have been renewed.
"""
stats = yield server.stat_shares(storage_indexes)
for storage_index, stat_dict in zip(storage_indexes, stats):
if not stat_dict:
# The server has no shares for this storage index.
continue
# Keep track of what's been seen.
activity.observe([stat.size for stat in stat_dict.values()])
# Each share has its own leases and each lease has its own expiration
# time. For each share the server only returns the lease with the
# expiration time farthest in the future.
#
# There is no API for renewing leases on just *some* shares! It is
# all or nothing. So from the server's response we find the share
# that will have no active lease soonest and make our decision about
# whether to renew leases at this storage index or not based on that.
most_endangered = soonest_expiration(stat_dict.values())
if needs_lease_renew(min_lease_remaining, most_endangered, now):
yield renew_lease(renewal_secret, cancel_secret, storage_index, server)
def soonest_expiration(stats):
# type: (Iterable[ShareStat]) -> ShareStat
"""
:return: The share stat from ``stats`` with a lease which expires before
all others.
"""
return min(
stats,
key=lambda stat: stat.lease_expiration,
)
def renew_lease(renewal_secret, cancel_secret, storage_index, server):
"""
Renew the lease on the shares in one storage index on one server.
:param renewal_secret: A seed for the renewal secret hash calculation for
any leases which need to be renewed.
:param cancel_secret: A seed for the cancel secret hash calculation for
any leases which need to be renewed.
:param bytes storage_index: The storage index to operate on.
:param StorageServer server: The storage server to operate on.
:return Deferred: A Deferred that fires when the lease has been renewed.
"""
# See allmydata/immutable/checker.py, _get_renewal_secret
renew_secret = bucket_renewal_secret_hash(
file_renewal_secret_hash(
renewal_secret,
storage_index,
),
server.get_lease_seed(),
)
cancel_secret = bucket_cancel_secret_hash(
file_cancel_secret_hash(
cancel_secret,
storage_index,
),
server.get_lease_seed(),
)
# Use add_lease to add a new lease *or* renew an existing one with a
# matching renew secret.
return server.add_lease(
)
def needs_lease_renew(min_lease_remaining, stat, now):
"""
Determine if a lease needs renewal.
:param timedelta min_lease_remaining: The minimum amount of time remaining
to allow on a lease without renewing it.
:param ShareStat stat: The metadata about a share to consider.
:param datetime now: The current time for comparison against the lease
expiration time.
:return bool: ``True`` if the lease needs to be renewed, ``False``
otherwise.
"""
remaining = datetime.utcfromtimestamp(stat.lease_expiration) - now
return remaining < min_lease_remaining
@attr.s
class _FuzzyTimerService(Service):
"""
A service to periodically, but not *too* periodically, run an operation.
:ivar operation: A no-argument callable to fuzzy-periodically run. It may
return a Deferred in which case the next run will not be scheduled
until the Deferred fires.
:ivar timedelta initial_interval: The amount of time to wait before the first
run of the operation.
:ivar sample_interval_distribution: A no-argument callable which returns a
number of seconds as a float giving the amount of time to wait before
the next run of the operation. It will be called each time the
operation completes.
:ivar IReactorTime reactor: A Twisted reactor to use to schedule runs of
the operation.
Jean-Paul Calderone
committed
:ivar get_config: A function to call to return the service's
configuration. The configuration is represented as a service-specific
object.
operation = attr.ib()
initial_interval = attr.ib()
sample_interval_distribution = attr.ib()
Jean-Paul Calderone
committed
get_config = attr.ib() # type: () -> Any
reactor = attr.ib()
def startService(self):
Service.startService(self)
self._call = self.reactor.callLater(
self.initial_interval.total_seconds(),
self._iterate,
)
def stopService(self):
self._call.cancel()
self._call = None
return Service.stopService(self)
def _iterate(self):
"""
Run the operation once and then schedule it to run again.
"""
d = maybeDeferred(self.operation)
d.addErrback(err, "Fuzzy timer service ({})".format(self.name))
d.addCallback(lambda ignored: self._schedule())
def _schedule(self):
"""
Schedule the next run of the operation.
"""
self._call = self.reactor.callLater(
self.sample_interval_distribution().total_seconds(),
self._iterate,
)
def lease_maintenance_service(
maintain_leases,
reactor,
last_run_path,
random,
lease_maint_config,
):
"""
Get an ``IService`` which will maintain leases on ``root_node`` and any
nodes directly or transitively reachable from it.
:param IReactorClock reactor: A Twisted reactor for scheduling renewal
activity.
:param FilePath last_run_path: A path containing the time (as an ISO8601
datetime string) at which lease maintenance last ran to inform an
adjustment to the first interval before running it again. If no file
exists at the path it is treated as though there has been no previous
run. The path will also be rewritten on each run to update this
value.
:param random: An object like ``random.Random`` which can be used as a
source of scheduling delay.
:param lease_maint_config: Configuration for the tweakable lease
maintenance parameters.
:param maintain_leases: A no-argument callable which performs a round of
lease-maintenance. The resulting service calls this periodically.
interval_mean = lease_maint_config.crawl_interval_mean
interval_range = lease_maint_config.crawl_interval_range
halfrange = lease_maint_config.crawl_interval_range / 2
def sample_interval_distribution():
return timedelta(
seconds=random.uniform(
(interval_mean - halfrange).total_seconds(),
(interval_mean + halfrange).total_seconds(),
),
)
# Rather than an all-or-nothing last-run time we probably eventually want
# to have a more comprehensive record of the state when we were last
# interrupted. This would remove the unfortunate behavior of restarting
# from the beginning if we shut down during a lease scan. Shutting down
# during a lease scan becomes increasingly likely the more shares there
# are to check.
last_run = read_time_from_path(last_run_path)
if last_run is None:
initial_interval = sample_interval_distribution()
else:
initial_interval = calculate_initial_interval(
sample_interval_distribution,
last_run,
datetime.utcfromtimestamp(reactor.seconds()),
)
initial_interval = max(
initial_interval,
timedelta(0),
)
def get_lease_maint_config():
return lease_maint_config
Jean-Paul Calderone
committed
lambda: bracket(
lambda: None,
lambda: write_time_to_path(
last_run_path,
datetime.utcfromtimestamp(reactor.seconds()),
),
maintain_leases,
),
initial_interval,
sample_interval_distribution,
get_lease_maint_config,
Jean-Paul Calderone
committed
@attr.s(frozen=True)
class LeaseMaintenanceConfig(object):
"""
Represent the configuration for a lease maintenance service.
:ivar crawl_interval_mean: The mean time between lease renewal checks.
:ivar crawl_interval_range: The range of the uniform distribution of lease
renewal checks (centered on ``interval_mean``).
:ivar min_lease_remaining: The minimum amount of time remaining to allow
on a lease without renewing it.
Jean-Paul Calderone
committed
"""
crawl_interval_mean = attr.ib() # type: datetime.timedelta
crawl_interval_range = attr.ib() # type: datetime.timedelta
min_lease_remaining = attr.ib() # type: datetime.timedelta
Jean-Paul Calderone
committed
def lease_maintenance_config_to_dict(lease_maint_config):
# type: (LeaseMaintenanceConfig) -> Dict[str, str]
return {
"lease.crawl-interval.mean": duration_isoformat(
lease_maint_config.crawl_interval_mean,
),
"lease.crawl-interval.range": duration_isoformat(
lease_maint_config.crawl_interval_range,
),
"lease.min-time-remaining": duration_isoformat(
lease_maint_config.min_lease_remaining,
),
Jean-Paul Calderone
committed
}
def lease_maintenance_config_from_dict(d):
# type: (Dict[str, str]) -> LeaseMaintenanceConfig
return LeaseMaintenanceConfig(
crawl_interval_mean=parse_duration(d["lease.crawl-interval.mean"]),
crawl_interval_range=parse_duration(d["lease.crawl-interval.range"]),
min_lease_remaining=parse_duration(d["lease.min-time-remaining"]),
Jean-Paul Calderone
committed
)
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
def write_time_to_path(path, when):
"""
Write an ISO8601 datetime string to a file.
:param FilePath path: The path to a file to which to write the datetime
string.
:param datetime when: The datetime to write.
"""
path.setContent(when.isoformat())
def read_time_from_path(path):
"""
Read an ISO8601 datetime string from a file.
:param FilePath path: The path to a file containing a datetime string.
:return: None if no file exists at the path. Otherwise, a datetime
instance giving the time represented in the file.
"""
try:
when = path.getContent()
except IOError as e:
if ENOENT == e.errno:
return None
raise
else:
return parse_datetime(when)
def visit_storage_indexes_from_root(visitor, get_root_nodes):
"""
An operation for ``lease_maintenance_service`` which applies the given
visitor to ``root_node`` and all its children.
:param visitor: A one-argument callable which takes the traversal function
and which should call it as desired.
:param get_root_nodes: A no-argument callable which returns a list of
filesystem nodes (``IFilesystemNode``) at which traversal will begin.
:return: A no-argument callable to perform the visits.
"""
Jean-Paul Calderone
committed
return lambda: visitor(
partial(
visit_storage_indexes,
# Make sure we call get_root_nodes each time to give us a chance
# to notice when it changes.
get_root_nodes(),
Jean-Paul Calderone
committed
),
@implementer(ILeaseMaintenanceObserver)
class NoopMaintenanceObserver(object):
"""
A lease maintenance observer that does nothing.
"""
def observe(self, sizes):
pass
def finish(self):
pass
@implementer(ILeaseMaintenanceObserver)
@attr.s
class MemoryMaintenanceObserver(object):
"""
A lease maintenance observer that records observations in memory.
"""
observed = attr.ib(default=attr.Factory(list))
finished = attr.ib(default=False)
def observe(self, sizes):
self.observed.append(sizes)
def finish(self):
self.finished = True
def maintain_leases_from_root(
get_root_nodes,
storage_broker,
secret_holder,
min_lease_remaining,
progress,
get_now,
An operation for ``lease_maintenance_service`` which visits ``root_node``
and all its children and renews their leases if they have
``min_lease_remaining`` or less on them.
:param get_root_nodes: A no-argument callable which returns the list of
Tahoe-LAFS filesystem nodes (``IFilesystemNode``) to use as the roots
of the node hierarchies to be maintained.
:param StorageFarmBroker storage_broker: The storage broker which can put
us in touch with storage servers where shares of the nodes to maintain
might be found.
:param SecretHolder secret_holder: The Tahoe-LAFS client node secret
holder which can give us the lease renewal secrets needed to renew
leases.
:param timedelta min_lease_remaining: The minimum amount of time remaining
to allow on a lease without renewing it.
:param get_now: A no-argument callable that returns the current time as a
``datetime`` instance.
:return: A no-argument callable to perform the maintenance.
def visitor(visit_assets):
return renew_leases(
visit_assets,
storage_broker,
secret_holder,
min_lease_remaining,
get_now,
)
return visit_storage_indexes_from_root(
visitor,
def calculate_initial_interval(sample_interval_distribution, last_run, now):
"""
Determine how long to wait before performing an initial (for this process)
scan for aging leases.
:param sample_interval_distribution: See ``_FuzzyTimerService``.
:param datetime last_run: The time of the last scan.
:param datetime now: The current time.
"""
since_last_run = now - last_run
initial_interval = sample_interval_distribution() - since_last_run
return initial_interval