From 84eb9cc3b581b81e499dcae6ea03e31011347fb7 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone <exarkun@twistedmatrix.com> Date: Wed, 11 Dec 2019 13:56:51 -0500 Subject: [PATCH] [wip] lease maintenance --- src/_zkapauthorizer/lease_maintenance.py | 342 ++++++++++++++++++ src/_zkapauthorizer/resource.py | 7 + .../tests/test_lease_maintenance.py | 163 +++++++++ 3 files changed, 512 insertions(+) create mode 100644 src/_zkapauthorizer/lease_maintenance.py create mode 100644 src/_zkapauthorizer/tests/test_lease_maintenance.py diff --git a/src/_zkapauthorizer/lease_maintenance.py b/src/_zkapauthorizer/lease_maintenance.py new file mode 100644 index 0000000..3101909 --- /dev/null +++ b/src/_zkapauthorizer/lease_maintenance.py @@ -0,0 +1,342 @@ +# Copyright 2019 PrivateStorage.io, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module implements a service which periodically spends ZKAPs to +refresh leases on all shares reachable from a root. +""" + +from functools import ( + partial, +) +from datetime import ( + datetime, + timedelta, +) + +import attr + +from twisted.internet.defer import ( + inlineCallbacks, + maybeDeferred, +) +from twisted.application.service import ( + Service, +) +from twisted.python.log import ( + err, +) + +from allmydata.interfaces import ( + IDirectoryNode, +) +from allmydata.util.hashutil import ( + file_renewal_secret_hash, + bucket_renewal_secret_hash, +) + +@inlineCallbacks +def visit_filesystem_nodes(root_node, visit): + """ + Call a visitor with the storage index of ``root_node`` and that of all + nodes reachable from it. + + :param IFilesystemNode root_node: The node from which to start. + + :param visit: A one-argument callable. It will be called with the storage + index of all visited nodes. + + :return Deferred: A Deferred which fires after all nodes have been + visited. + """ + stack = [root_node] + while stack: + elem = stack.pop() + visit(elem.get_storage_index()) + if IDirectoryNode.providedBy(elem): + for (child_node, child_metadata) in (yield elem.list()): + stack.append(child_node) + + +def iter_storage_indexes(visit_assets): + """ + Get an iterator over storage indexes of all nodes visited by + ``visit_assets``. + + :param visit_assets: A one-argument function which takes a visit function + and calls it with all nodes to visit. + + :return Deferred[list[bytes]]: A Deferred that fires with a list of + storage indexes from the visited nodes. The list is in an arbitrary + order and does not include duplicates if any nodes were visited more + than once. + """ + storage_indexes = set() + visit = storage_indexes.add + d = visit_assets(visit) + # Create some order now that we've ensured they're unique. + d.addCallback(lambda ignored: list(storage_indexes)) + return d + + +@inlineCallbacks +def renew_leases(visit_assets, storage_broker, secret_holder, min_lease_remaining, now): + """ + Check the leases on a group of nodes for those which are expired or close + to expiring and renew such leases. + + :param visit_assets: A one-argument callable which takes a visitor + function and calls it with the storage index of every node to check. + + :param StorageFarmBroker storage_broker: A storage broker which can supply + the storage servers where the nodes should be checked. + + :param SecretHolder secret_holder: The source of the renew secret for any + leases which require renewal. + + :param timedelta min_lease_remaining: The minimum amount of time remaining + to allow on a lease without renewing it. + + :param now: A no-argument function returning the current time, as a + datetime instance, for comparison against lease expiration time. + + :return Deferred: A Deferred which fires when all visitable nodes have + been checked and any leases renewed which required it. + """ + storage_indexes = yield iter_storage_indexes(visit_assets) + + renewal_secret = secret_holder.get_renewal_secret() + servers = storage_broker.get_connected_servers() + + for server in servers: + # Consider parallelizing this. + yield renew_leases_on_server( + min_lease_remaining, + renewal_secret, + storage_indexes, + server, + now(), + ) + + +@inlineCallbacks +def renew_leases_on_server( + min_lease_remaining, + renewal_secret, + storage_indexes, + server, + now, +): + """ + Check leases on the shares for the given storage indexes on the given + storage server for those which are expired or close to expiring and renew + such leases. + + :param timedelta min_lease_remaining: The minimum amount of time remaining + to allow on a lease without renewing it. + + :param renewal_secret: A seed for the renewal secret hash calculation for + any leases which need to be renewed. + + :param list[bytes] storage_indexes: The storage indexes to check. + + :param StorageServer server: The storage server on which to check. + + :param datetime now: The current time for comparison against the least + expiration time. + + :return Deferred: A Deferred which fires after all storage indexes have + been checked and any leases that need renewal have been renewed. + """ + stats = yield server.stat_shares(storage_indexes) + for storage_index, stat in zip(storage_indexes, stats): + if needs_lease_renew(min_lease_remaining, stat, now): + yield renew_lease(renewal_secret, storage_index, server) + + +def renew_lease(renewal_secret, storage_index, server): + """ + Renew the lease on the shares in one storage index on one server. + + :param renewal_secret: A seed for the renewal secret hash calculation for + any leases which need to be renewed. + + :param bytes storage_index: The storage index to operate on. + + :param StorageServer server: The storage server to operate on. + + :return Deferred: A Deferred that fires when the lease has been renewed. + """ + # See allmydata/immutable/checker.py, _get_renewal_secret + renew_secret = bucket_renewal_secret_hash( + file_renewal_secret_hash( + renewal_secret, + storage_index, + ), + server.get_lease_seed(), + ) + return server.renew_lease( + storage_index, + renew_secret, + ) + + +def needs_lease_renew(min_lease_remaining, stat, now): + """ + Determine if a lease needs renewal. + + :param timedelta min_lease_remaining: The minimum amount of time remaining + to allow on a lease without renewing it. + + :param ShareStat stat: The metadata about a share to consider. + + :param datetime now: The current time for comparison against the lease + expiration time. + + :return bool: ``True`` if the lease needs to be renewed, ``False`` + otherwise. + """ + remaining = now - datetime.utcfromtimestamp(stat.lease_expiration) + return remaining < min_lease_remaining + + +@attr.s +class _FuzzyTimerService(Service): + """ + A service to periodically, but not *too* periodically, run an operation. + + :ivar operation: A no-argument callable to fuzzy-periodically run. It may + return a Deferred in which case the next run will not be scheduled + until the Deferred fires. + + :ivar timedelta initial_interval: The amount of time to wait before the first + run of the operation. + + :ivar sample_interval_distribution: A no-argument callable which returns a + number of seconds as a float giving the amount of time to wait before + the next run of the operation. It will be called each time the + operation completes. + + :ivar IReactorTime reactor: A Twisted reactor to use to schedule runs of + the operation. + """ + operation = attr.ib() + initial_interval = attr.ib() + sample_interval_distribution = attr.ib() + reactor = attr.ib() + + def startService(self): + Service.startService(self) + self.reactor.callLater( + self.initial_interval.total_seconds(), + self._iterate, + ) + + def _iterate(self): + """ + Run the operation once and then schedule it to run again. + """ + d = maybeDeferred(self.operation) + d.addErrback(err, "Fuzzy timer service ({})") + d.addCallback(lambda ignored: self._schedule()) + + def _schedule(self): + """ + Schedule the next run of the operation. + """ + self._call = self.reactor.callLater( + self.sample_interval_distribution(), + self._iterate, + ) + + +def lease_maintenance_service( + reactor, + root_node, + storage_broker, + secret_holder, + last_run, + random, +): + """ + Get an ``IService`` which will maintain leases on ``root_node`` and any + nodes directly or transitively reachable from it. + + :param IReactorClock reactor: A Twisted reactor for scheduling renewal + activity. + + :param IFilesystemNode root_node: A Tahoe-LAFS filesystem node to use as + the root of a node hierarchy to be maintained. + + :param StorageFarmBroker storage_broker: The storage broker which can put + us in touch with storage servers where shares of the nodes to maintain + might be found. + + :param SecretHolder secret_holder: The Tahoe-LAFS client node secret + holder which can give us the lease renewal secrets needed to renew + leases. + + :param datetime last_run: The time at which lease maintenance last ran to + inform an adjustment to the first interval before running it again, or + ``None`` not to make such an adjustment. + + :param random: An object like ``random.Random`` which can be used as a + source of scheduling delay. + """ + mean = timedelta(days=26).total_seconds() + halfrange = timedelta(days=2).total_seconds() + min_lease_remaining = timedelta(days=3) + sample_interval_distribution = partial( + random.uniform, + mean - halfrange, + mean + halfrange, + ) + if last_run is None: + initial_interval = sample_interval_distribution() + else: + initial_interval = calculate_initial_interval( + sample_interval_distribution, + last_run, + datetime.utcfromtimestamp(reactor.seconds()), + ) + initial_interval = max( + initial_interval, + timedelta(0), + ) + return _FuzzyTimerService( + partial( + renew_leases, + partial(visit_filesystem_nodes, root_node), + storage_broker, + secret_holder, + min_lease_remaining, + lambda: datetime.utcfromtimestamp(reactor.seconds()), + ), + initial_interval, + sample_interval_distribution, + reactor, + ) + +def calculate_initial_interval(sample_interval_distribution, last_run, now): + """ + Determine how long to wait before performing an initial (for this process) + scan for aging leases. + + :param sample_interval_distribution: See ``_FuzzyTimerService``. + :param datetime last_run: The time of the last scan. + :param datetime now: The current time. + """ + since_last_run = now - last_run + initial_interval = sample_interval_distribution() - since_last_run + return initial_interval diff --git a/src/_zkapauthorizer/resource.py b/src/_zkapauthorizer/resource.py index fb7e61a..69e1618 100644 --- a/src/_zkapauthorizer/resource.py +++ b/src/_zkapauthorizer/resource.py @@ -108,6 +108,13 @@ def from_configuration(node_config, store, redeemer=None): controller, ), ) + + from twisted.internet import reactor + from .lease_maintenance import lease_maintenance_service + maintenance = lease_maintenance_service() + maintenance.startService() + reactor.addSystemEventTrigger("before", "shutdown", maintenance.stopService) + return root diff --git a/src/_zkapauthorizer/tests/test_lease_maintenance.py b/src/_zkapauthorizer/tests/test_lease_maintenance.py new file mode 100644 index 0000000..b559dfd --- /dev/null +++ b/src/_zkapauthorizer/tests/test_lease_maintenance.py @@ -0,0 +1,163 @@ +# Copyright 2019 PrivateStorage.io, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for ``_zkapauthorizer.lease_maintenance``. +""" + +from __future__ import ( + absolute_import, + unicode_literals, +) + +from datetime import ( + datetime, + timedelta, +) + +import attr + +from testtools import ( + TestCase, +) +from hypothesis.strategies import ( + builds, + binary, + integers, + lists, + dictionaries, +) + +from twisted.internet.task import ( + Clock, +) +from twisted.internet.defer import ( + succeed, +) +from twisted.application.service import ( + IService, +) + +from allmydata.util.hashutil import ( + CRYPTO_VAL_SIZE, +) +from allmydata.client import ( + SecretHolder, +) + +from ..foolscap import ( + ShareStat, +) + +from .matchers import ( + Provides, +) +from .strategies import ( + storage_indexes, +) + +from ..lease_maintenace import ( + lease_maintenance_service, +) + + +@attr.s +class DummyStorageServer(object): + """ + :ivar dict[bytes, datetime] buckets: A mapping from storage index to lease + expiration time for shares at that storage index. + """ + clock = attr.ib() + buckets = attr.ib() + lease_seed = attr.ib() + + def stat_shares(self, storage_indexes): + return succeed(list( + self.buckets[idx] + for idx + in storage_indexes + )) + + def get_lease_seed(self): + return self.lease_seed + + def renew_lease(self, storage_index, renew_secret): + self.buckets[storage_index].lease_expiration = ( + self.clock.seconds() + timedelta(days=31).total_seconds() + ) + + +def lease_seeds(): + return binary( + min_size=CRYPTO_VAL_SIZE, + max_size=CRYPTO_VAL_SIZE, + ) + +def share_stats(): + return builds( + ShareStat, + size=integers(min_value=0), + lease_expiration=integers(min_value=0, max_value=2 ** 31), + ) + +def storage_servers(clocks): + return builds( + DummyStorageServer, + clocks, + dictionaries(storage_indexes(), share_stats()), + lease_seeds(), + ) + + +@attr.s +class DummyStorageBroker(object): + clock = attr.ib() + _storage_servers = attr.ib() + + def get_connected_servers(self): + return self._storage_servers + + +def storage_brokers(clocks): + return builds( + DummyStorageBroker, + lists(storage_servers(clocks)), + ) + + +class LeaseMaintenanceServiceTests(TestCase): + """ + Tests for the service returned by ``lease_maintenance_service``. + """ + def test_interface(self): + """ + The service provides ``IService``. + """ + clock = Clock() + root_node = object() + random = object() + lease_secret = b"\0" * CRYPTO_VAL_SIZE + convergence_secret = b"\1" * CRYPTO_VAL_SIZE + service = lease_maintenance_service( + clock, + root_node, + DummyStorageBroker(clock, []), + SecretHolder(lease_secret, convergence_secret), + datetime.utcfromtimestamp(0), + random, + ) + self.assertThat( + service, + Provides(IService), + ) -- GitLab