Skip to content
Snippets Groups Projects
lease_maintenance.py 12.8 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2019 PrivateStorage.io, LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """
    This module implements a service which periodically spends ZKAPs to
    refresh leases on all shares reachable from a root.
    """
    
    from functools import (
        partial,
    )
    from datetime import (
        datetime,
        timedelta,
    )
    
    import attr
    
    from twisted.internet.defer import (
        inlineCallbacks,
        maybeDeferred,
    )
    from twisted.application.service import (
        Service,
    )
    from twisted.python.log import (
        err,
    )
    
    from allmydata.interfaces import (
        IDirectoryNode,
    )
    from allmydata.util.hashutil import (
        file_renewal_secret_hash,
        bucket_renewal_secret_hash,
    )
    
    @inlineCallbacks
    
    def visit_storage_indexes(root_node, visit):
    
        """
        Call a visitor with the storage index of ``root_node`` and that of all
        nodes reachable from it.
    
        :param IFilesystemNode root_node: The node from which to start.
    
        :param visit: A one-argument callable.  It will be called with the storage
            index of all visited nodes.
    
        :return Deferred: A Deferred which fires after all nodes have been
            visited.
        """
        stack = [root_node]
        while stack:
            elem = stack.pop()
            visit(elem.get_storage_index())
            if IDirectoryNode.providedBy(elem):
    
                children = yield elem.list()
                # Produce consistent results by forcing some consistent ordering
                # here.  This will sort by name.
                stable_children = sorted(children.items())
                for (name, (child_node, child_metadata)) in stable_children:
    
                    stack.append(child_node)
    
    
    def iter_storage_indexes(visit_assets):
        """
        Get an iterator over storage indexes of all nodes visited by
        ``visit_assets``.
    
        :param visit_assets: A one-argument function which takes a visit function
            and calls it with all nodes to visit.
    
        :return Deferred[list[bytes]]: A Deferred that fires with a list of
            storage indexes from the visited nodes.  The list is in an arbitrary
            order and does not include duplicates if any nodes were visited more
            than once.
        """
        storage_indexes = set()
        visit = storage_indexes.add
        d = visit_assets(visit)
        # Create some order now that we've ensured they're unique.
        d.addCallback(lambda ignored: list(storage_indexes))
        return d
    
    
    @inlineCallbacks
    def renew_leases(visit_assets, storage_broker, secret_holder, min_lease_remaining, now):
        """
        Check the leases on a group of nodes for those which are expired or close
        to expiring and renew such leases.
    
        :param visit_assets: A one-argument callable which takes a visitor
            function and calls it with the storage index of every node to check.
    
        :param StorageFarmBroker storage_broker: A storage broker which can supply
            the storage servers where the nodes should be checked.
    
        :param SecretHolder secret_holder: The source of the renew secret for any
            leases which require renewal.
    
        :param timedelta min_lease_remaining: The minimum amount of time remaining
            to allow on a lease without renewing it.
    
        :param now: A no-argument function returning the current time, as a
            datetime instance, for comparison against lease expiration time.
    
        :return Deferred: A Deferred which fires when all visitable nodes have
            been checked and any leases renewed which required it.
        """
        storage_indexes = yield iter_storage_indexes(visit_assets)
    
        renewal_secret = secret_holder.get_renewal_secret()
        servers = storage_broker.get_connected_servers()
    
        for server in servers:
            # Consider parallelizing this.
            yield renew_leases_on_server(
                min_lease_remaining,
                renewal_secret,
                storage_indexes,
                server,
                now(),
            )
    
    
    @inlineCallbacks
    def renew_leases_on_server(
            min_lease_remaining,
            renewal_secret,
            storage_indexes,
            server,
            now,
    ):
        """
        Check leases on the shares for the given storage indexes on the given
        storage server for those which are expired or close to expiring and renew
        such leases.
    
        :param timedelta min_lease_remaining: The minimum amount of time remaining
            to allow on a lease without renewing it.
    
        :param renewal_secret: A seed for the renewal secret hash calculation for
            any leases which need to be renewed.
    
        :param list[bytes] storage_indexes: The storage indexes to check.
    
        :param StorageServer server: The storage server on which to check.
    
        :param datetime now: The current time for comparison against the least
            expiration time.
    
        :return Deferred: A Deferred which fires after all storage indexes have
            been checked and any leases that need renewal have been renewed.
        """
        stats = yield server.stat_shares(storage_indexes)
        for storage_index, stat in zip(storage_indexes, stats):
            if needs_lease_renew(min_lease_remaining, stat, now):
                yield renew_lease(renewal_secret, storage_index, server)
    
    
    def renew_lease(renewal_secret, storage_index, server):
        """
        Renew the lease on the shares in one storage index on one server.
    
        :param renewal_secret: A seed for the renewal secret hash calculation for
            any leases which need to be renewed.
    
        :param bytes storage_index: The storage index to operate on.
    
        :param StorageServer server: The storage server to operate on.
    
        :return Deferred: A Deferred that fires when the lease has been renewed.
        """
        # See allmydata/immutable/checker.py, _get_renewal_secret
        renew_secret = bucket_renewal_secret_hash(
            file_renewal_secret_hash(
                renewal_secret,
                storage_index,
            ),
            server.get_lease_seed(),
        )
        return server.renew_lease(
            storage_index,
            renew_secret,
        )
    
    
    def needs_lease_renew(min_lease_remaining, stat, now):
        """
        Determine if a lease needs renewal.
    
        :param timedelta min_lease_remaining: The minimum amount of time remaining
            to allow on a lease without renewing it.
    
        :param ShareStat stat: The metadata about a share to consider.
    
        :param datetime now: The current time for comparison against the lease
            expiration time.
    
        :return bool: ``True`` if the lease needs to be renewed, ``False``
            otherwise.
        """
    
        remaining = datetime.utcfromtimestamp(stat.lease_expiration) - now
    
        return remaining < min_lease_remaining
    
    
    @attr.s
    class _FuzzyTimerService(Service):
        """
        A service to periodically, but not *too* periodically, run an operation.
    
        :ivar operation: A no-argument callable to fuzzy-periodically run.  It may
            return a Deferred in which case the next run will not be scheduled
            until the Deferred fires.
    
        :ivar timedelta initial_interval: The amount of time to wait before the first
            run of the operation.
    
        :ivar sample_interval_distribution: A no-argument callable which returns a
           number of seconds as a float giving the amount of time to wait before
           the next run of the operation.  It will be called each time the
           operation completes.
    
        :ivar IReactorTime reactor: A Twisted reactor to use to schedule runs of
            the operation.
        """
        operation = attr.ib()
        initial_interval = attr.ib()
        sample_interval_distribution = attr.ib()
        reactor = attr.ib()
    
        def startService(self):
            Service.startService(self)
            self.reactor.callLater(
                self.initial_interval.total_seconds(),
                self._iterate,
            )
    
        def _iterate(self):
            """
            Run the operation once and then schedule it to run again.
            """
            d = maybeDeferred(self.operation)
            d.addErrback(err, "Fuzzy timer service ({})")
            d.addCallback(lambda ignored: self._schedule())
    
        def _schedule(self):
            """
            Schedule the next run of the operation.
            """
            self._call = self.reactor.callLater(
    
                self.sample_interval_distribution().total_seconds(),
    
                self._iterate,
            )
    
    
    def lease_maintenance_service(
    
            maintain_leases,
    
            reactor,
            last_run,
            random,
    
            interval_mean=None,
            interval_range=None,
    
    ):
        """
        Get an ``IService`` which will maintain leases on ``root_node`` and any
        nodes directly or transitively reachable from it.
    
        :param IReactorClock reactor: A Twisted reactor for scheduling renewal
            activity.
    
        :param datetime last_run: The time at which lease maintenance last ran to
            inform an adjustment to the first interval before running it again, or
            ``None`` not to make such an adjustment.
    
        :param random: An object like ``random.Random`` which can be used as a
            source of scheduling delay.
    
    
        :param timedelta interval_mean: The mean time between lease renewal checks.
    
        :param timedelta interval_range: The range of the uniform distribution of
            lease renewal checks (centered on ``interval_mean``).
    
    
        :param maintain_leases: A no-argument callable which performs a round of
            lease-maintenance.  The resulting service calls this periodically.
    
        if interval_mean is None:
            interval_mean = timedelta(days=26)
        if interval_range is None:
            interval_range = timedelta(days=4)
        halfrange = interval_range / 2
    
        def sample_interval_distribution():
            return timedelta(
                seconds=random.uniform(
                    (interval_mean - halfrange).total_seconds(),
                    (interval_mean + halfrange).total_seconds(),
                ),
            )
    
        if last_run is None:
            initial_interval = sample_interval_distribution()
        else:
            initial_interval = calculate_initial_interval(
                sample_interval_distribution,
                last_run,
                datetime.utcfromtimestamp(reactor.seconds()),
            )
            initial_interval = max(
                initial_interval,
                timedelta(0),
            )
    
        return _FuzzyTimerService(
    
            maintain_leases,
    
            initial_interval,
            sample_interval_distribution,
            reactor,
        )
    
    
    def visit_storage_indexes_from_root(visitor, root_node):
        """
        An operation for ``lease_maintenance_service`` which applies the given
        visitor to ``root_node`` and all its children.
    
        :param visitor: A one-argument callable which takes the traversal function
            and which should call it as desired.
    
        :param IFilesystemNode root_node: The filesystem node at which traversal
            will begin.
    
        :return: A no-argument callable to perform the visits.
        """
        return partial(
            visitor,
            partial(visit_storage_indexes, root_node),
        )
    
    
    
    def maintain_leases_from_root(root_node, storage_broker, secret_holder, min_lease_remaining, get_now):
        """
    
        An operation for ``lease_maintenance_service`` which visits ``root_node``
        and all its children and renews their leases if they have
        ``min_lease_remaining`` or less on them.
    
    
        :param IFilesystemNode root_node: A Tahoe-LAFS filesystem node to use as
            the root of a node hierarchy to be maintained.
    
        :param StorageFarmBroker storage_broker: The storage broker which can put
            us in touch with storage servers where shares of the nodes to maintain
            might be found.
    
        :param SecretHolder secret_holder: The Tahoe-LAFS client node secret
            holder which can give us the lease renewal secrets needed to renew
            leases.
    
    
        :param timedelta min_lease_remaining: The minimum amount of time remaining
            to allow on a lease without renewing it.
    
    
        :param get_now: A no-argument callable that returns the current time as a
            ``datetime`` instance.
    
    
        :return: A no-argument callable to perform the maintenance.
    
        def visitor(visit_assets):
            return renew_leases(
                visit_assets,
                storage_broker,
                secret_holder,
                min_lease_remaining,
                get_now,
            )
    
        return visit_storage_indexes_from_root(
            visitor,
            root_node,
    
    def calculate_initial_interval(sample_interval_distribution, last_run, now):
        """
        Determine how long to wait before performing an initial (for this process)
        scan for aging leases.
    
        :param sample_interval_distribution: See ``_FuzzyTimerService``.
        :param datetime last_run: The time of the last scan.
        :param datetime now: The current time.
        """
        since_last_run = now - last_run
        initial_interval = sample_interval_distribution() - since_last_run
        return initial_interval