# Copyright 2019 PrivateStorage.io, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module implements a service which periodically spends ZKAPs to
refresh leases on all shares reachable from a root.
"""

from functools import (
    partial,
)
from datetime import (
    datetime,
    timedelta,
)

import attr

from twisted.internet.defer import (
    inlineCallbacks,
    maybeDeferred,
)
from twisted.application.service import (
    Service,
)
from twisted.python.log import (
    err,
)

from allmydata.interfaces import (
    IDirectoryNode,
)
from allmydata.util.hashutil import (
    file_renewal_secret_hash,
    bucket_renewal_secret_hash,
)

@inlineCallbacks
def visit_storage_indexes(root_node, visit):
    """
    Call a visitor with the storage index of ``root_node`` and that of all
    nodes reachable from it.

    :param IFilesystemNode root_node: The node from which to start.

    :param visit: A one-argument callable.  It will be called with the storage
        index of all visited nodes.

    :return Deferred: A Deferred which fires after all nodes have been
        visited.
    """
    stack = [root_node]
    while stack:
        elem = stack.pop()
        visit(elem.get_storage_index())
        if IDirectoryNode.providedBy(elem):
            children = yield elem.list()
            # Produce consistent results by forcing some consistent ordering
            # here.  This will sort by name.
            stable_children = sorted(children.items())
            for (name, (child_node, child_metadata)) in stable_children:
                stack.append(child_node)


def iter_storage_indexes(visit_assets):
    """
    Get an iterator over storage indexes of all nodes visited by
    ``visit_assets``.

    :param visit_assets: A one-argument function which takes a visit function
        and calls it with all nodes to visit.

    :return Deferred[list[bytes]]: A Deferred that fires with a list of
        storage indexes from the visited nodes.  The list is in an arbitrary
        order and does not include duplicates if any nodes were visited more
        than once.
    """
    storage_indexes = set()
    visit = storage_indexes.add
    d = visit_assets(visit)
    # Create some order now that we've ensured they're unique.
    d.addCallback(lambda ignored: list(storage_indexes))
    return d


@inlineCallbacks
def renew_leases(visit_assets, storage_broker, secret_holder, min_lease_remaining, now):
    """
    Check the leases on a group of nodes for those which are expired or close
    to expiring and renew such leases.

    :param visit_assets: A one-argument callable which takes a visitor
        function and calls it with the storage index of every node to check.

    :param StorageFarmBroker storage_broker: A storage broker which can supply
        the storage servers where the nodes should be checked.

    :param SecretHolder secret_holder: The source of the renew secret for any
        leases which require renewal.

    :param timedelta min_lease_remaining: The minimum amount of time remaining
        to allow on a lease without renewing it.

    :param now: A no-argument function returning the current time, as a
        datetime instance, for comparison against lease expiration time.

    :return Deferred: A Deferred which fires when all visitable nodes have
        been checked and any leases renewed which required it.
    """
    storage_indexes = yield iter_storage_indexes(visit_assets)

    renewal_secret = secret_holder.get_renewal_secret()
    servers = storage_broker.get_connected_servers()

    for server in servers:
        # Consider parallelizing this.
        yield renew_leases_on_server(
            min_lease_remaining,
            renewal_secret,
            storage_indexes,
            server,
            now(),
        )


@inlineCallbacks
def renew_leases_on_server(
        min_lease_remaining,
        renewal_secret,
        storage_indexes,
        server,
        now,
):
    """
    Check leases on the shares for the given storage indexes on the given
    storage server for those which are expired or close to expiring and renew
    such leases.

    :param timedelta min_lease_remaining: The minimum amount of time remaining
        to allow on a lease without renewing it.

    :param renewal_secret: A seed for the renewal secret hash calculation for
        any leases which need to be renewed.

    :param list[bytes] storage_indexes: The storage indexes to check.

    :param StorageServer server: The storage server on which to check.

    :param datetime now: The current time for comparison against the least
        expiration time.

    :return Deferred: A Deferred which fires after all storage indexes have
        been checked and any leases that need renewal have been renewed.
    """
    stats = yield server.stat_shares(storage_indexes)
    for storage_index, stat in zip(storage_indexes, stats):
        if needs_lease_renew(min_lease_remaining, stat, now):
            yield renew_lease(renewal_secret, storage_index, server)


def renew_lease(renewal_secret, storage_index, server):
    """
    Renew the lease on the shares in one storage index on one server.

    :param renewal_secret: A seed for the renewal secret hash calculation for
        any leases which need to be renewed.

    :param bytes storage_index: The storage index to operate on.

    :param StorageServer server: The storage server to operate on.

    :return Deferred: A Deferred that fires when the lease has been renewed.
    """
    # See allmydata/immutable/checker.py, _get_renewal_secret
    renew_secret = bucket_renewal_secret_hash(
        file_renewal_secret_hash(
            renewal_secret,
            storage_index,
        ),
        server.get_lease_seed(),
    )
    return server.renew_lease(
        storage_index,
        renew_secret,
    )


def needs_lease_renew(min_lease_remaining, stat, now):
    """
    Determine if a lease needs renewal.

    :param timedelta min_lease_remaining: The minimum amount of time remaining
        to allow on a lease without renewing it.

    :param ShareStat stat: The metadata about a share to consider.

    :param datetime now: The current time for comparison against the lease
        expiration time.

    :return bool: ``True`` if the lease needs to be renewed, ``False``
        otherwise.
    """
    remaining = datetime.utcfromtimestamp(stat.lease_expiration) - now
    return remaining < min_lease_remaining


@attr.s
class _FuzzyTimerService(Service):
    """
    A service to periodically, but not *too* periodically, run an operation.

    :ivar operation: A no-argument callable to fuzzy-periodically run.  It may
        return a Deferred in which case the next run will not be scheduled
        until the Deferred fires.

    :ivar timedelta initial_interval: The amount of time to wait before the first
        run of the operation.

    :ivar sample_interval_distribution: A no-argument callable which returns a
       number of seconds as a float giving the amount of time to wait before
       the next run of the operation.  It will be called each time the
       operation completes.

    :ivar IReactorTime reactor: A Twisted reactor to use to schedule runs of
        the operation.
    """
    operation = attr.ib()
    initial_interval = attr.ib()
    sample_interval_distribution = attr.ib()
    reactor = attr.ib()

    def startService(self):
        Service.startService(self)
        self.reactor.callLater(
            self.initial_interval.total_seconds(),
            self._iterate,
        )

    def _iterate(self):
        """
        Run the operation once and then schedule it to run again.
        """
        d = maybeDeferred(self.operation)
        d.addErrback(err, "Fuzzy timer service ({})")
        d.addCallback(lambda ignored: self._schedule())

    def _schedule(self):
        """
        Schedule the next run of the operation.
        """
        self._call = self.reactor.callLater(
            self.sample_interval_distribution().total_seconds(),
            self._iterate,
        )


def lease_maintenance_service(
        maintain_leases,
        reactor,
        last_run,
        random,
        interval_mean=None,
        interval_range=None,
):
    """
    Get an ``IService`` which will maintain leases on ``root_node`` and any
    nodes directly or transitively reachable from it.

    :param IReactorClock reactor: A Twisted reactor for scheduling renewal
        activity.

    :param datetime last_run: The time at which lease maintenance last ran to
        inform an adjustment to the first interval before running it again, or
        ``None`` not to make such an adjustment.

    :param random: An object like ``random.Random`` which can be used as a
        source of scheduling delay.

    :param timedelta interval_mean: The mean time between lease renewal checks.

    :param timedelta interval_range: The range of the uniform distribution of
        lease renewal checks (centered on ``interval_mean``).

    :param maintain_leases: A no-argument callable which performs a round of
        lease-maintenance.  The resulting service calls this periodically.
    """
    if interval_mean is None:
        interval_mean = timedelta(days=26)
    if interval_range is None:
        interval_range = timedelta(days=4)
    halfrange = interval_range / 2

    def sample_interval_distribution():
        return timedelta(
            seconds=random.uniform(
                (interval_mean - halfrange).total_seconds(),
                (interval_mean + halfrange).total_seconds(),
            ),
        )
    if last_run is None:
        initial_interval = sample_interval_distribution()
    else:
        initial_interval = calculate_initial_interval(
            sample_interval_distribution,
            last_run,
            datetime.utcfromtimestamp(reactor.seconds()),
        )
        initial_interval = max(
            initial_interval,
            timedelta(0),
        )

    return _FuzzyTimerService(
        maintain_leases,
        initial_interval,
        sample_interval_distribution,
        reactor,
    )


def visit_storage_indexes_from_root(visitor, root_node):
    """
    An operation for ``lease_maintenance_service`` which applies the given
    visitor to ``root_node`` and all its children.

    :param visitor: A one-argument callable which takes the traversal function
        and which should call it as desired.

    :param IFilesystemNode root_node: The filesystem node at which traversal
        will begin.

    :return: A no-argument callable to perform the visits.
    """
    return partial(
        visitor,
        partial(visit_storage_indexes, root_node),
    )


def maintain_leases_from_root(root_node, storage_broker, secret_holder, min_lease_remaining, get_now):
    """
    An operation for ``lease_maintenance_service`` which visits ``root_node``
    and all its children and renews their leases if they have
    ``min_lease_remaining`` or less on them.

    :param IFilesystemNode root_node: A Tahoe-LAFS filesystem node to use as
        the root of a node hierarchy to be maintained.

    :param StorageFarmBroker storage_broker: The storage broker which can put
        us in touch with storage servers where shares of the nodes to maintain
        might be found.

    :param SecretHolder secret_holder: The Tahoe-LAFS client node secret
        holder which can give us the lease renewal secrets needed to renew
        leases.

    :param timedelta min_lease_remaining: The minimum amount of time remaining
        to allow on a lease without renewing it.

    :param get_now: A no-argument callable that returns the current time as a
        ``datetime`` instance.

    :return: A no-argument callable to perform the maintenance.
    """
    def visitor(visit_assets):
        return renew_leases(
            visit_assets,
            storage_broker,
            secret_holder,
            min_lease_remaining,
            get_now,
        )

    return visit_storage_indexes_from_root(
        visitor,
        root_node,
    )


def calculate_initial_interval(sample_interval_distribution, last_run, now):
    """
    Determine how long to wait before performing an initial (for this process)
    scan for aging leases.

    :param sample_interval_distribution: See ``_FuzzyTimerService``.
    :param datetime last_run: The time of the last scan.
    :param datetime now: The current time.
    """
    since_last_run = now - last_run
    initial_interval = sample_interval_distribution() - since_last_run
    return initial_interval