diff --git a/docs/source/index.rst b/docs/source/index.rst index d178b7d084c4716add8d5527a579dab470159b3d..629c8c9321786cc2c14e73ab22f283a098eae945 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -13,6 +13,7 @@ Welcome to ZKAP Authorizer's documentation! code_of_conduct CONTRIBUTING interface + leases Indices and tables ================== diff --git a/docs/source/leases.rst b/docs/source/leases.rst new file mode 100644 index 0000000000000000000000000000000000000000..91e743a83828a53e9c3ffac2a60edfbe8b8cab1f --- /dev/null +++ b/docs/source/leases.rst @@ -0,0 +1,68 @@ +Leases +====== + +Leases held on shares are treated as a guarantee that a storage server will hold those shares for the duration of the lease. +Leases have an expiration date which can be changed with a renewal operation to a date at a fixed distance in the future of the renewal. +Lease renewal requires the expenditure of ZKAPs in proportion to the size of the shares and the distance to the new expiration date. +Because lease the expiration date is advanced from the time of the renewal and not the time of the original expiration, +care is taken to only renew leases for which the expiration time will soon arrive. + +Design +------ + +The process of checking lease age and renewing them is automated in the client storage plugin. +The storage plugin interface is not ideally shaped to support this functionality. +The following designs have been considered. + +Option A +~~~~~~~~ + +Each ZKAPAuthorizerStorageClient is a service which is a child of the client node. +Each creates its own service child using lease_maintenance_service(). +This results in linear factor of redundant lease maintenance work (equal to number of storage servers). +Requires change to Tahoe-LAFS to add clients as service children. + +Option B +~~~~~~~~ + +Each ZKAPAuthorizerStorageClient is a service which is a child of the client node. +Each creates its own service child using lease_maintenance_service(). +Lease maintenance function is augmented with a check against all other lease maintenance services. +Only the arbitrary-sort-key-smallest service ever actually runs. +This results in small-k linear factor overhead (on number of storage servers) to choose a winner but no lease maintenance overhead. +Requires change to Tahoe-LAFS to add clients as service children. + +Option C +~~~~~~~~ + +The plugin interface has a method to create a service which is a child of the client node. +The service is the lease maintenance service as created by lease_maintenance_service(). +There is only one so there is no winner-selection overhead or redundant lease maintenance work. +Requires change to Tahoe-LAFS to call new method to get service and add result as service child. + +Option D +~~~~~~~~ + +The plugin creates and starts a single lease maintenance service itself. +The plugin reaches deep into the guts of something to find a client node so it can initialize the lease maintenance service +(an expression liked ``get_rref.im_self._on_status_changed.watchers[0].__closure__[0].cell_contents`` was considered to reach the ``StorageFarmBroker`` which is a child of ``_Client``). +The plugin glues it into the reactor itself for shutdown notification. +There is only one service so no winner-selection or redundant lease maintenance work is required. +This can be improved to Option C at some point. + +On closer inspection, even the complex expression above is not sufficient to reach the correct object. +Even if a similar expression is found which works, +this option is likely more complex and fragile than *Option E*. + +Option E +~~~~~~~~ +The plugin creates and starts a single lease maintenance service itself. +The plugin monkey-patches ``allmydata.client._Client`` to perform initialization of the service at an appropriate time. +There is only one service so no winner-selection or redundant lease maintenance work is required. +This can be improved to Option C at some point. + +Implementation +-------------- + +*Option E* is currently implemented. +Monkey-patching is performed at import time by ``_zkapauthorizer._plugin``. diff --git a/src/_zkapauthorizer/_plugin.py b/src/_zkapauthorizer/_plugin.py index 0711f2c7a1fe9091050d05738ed4fdb1c9471846..262db7788a568f0c0aadfe167c40385518e3528a 100644 --- a/src/_zkapauthorizer/_plugin.py +++ b/src/_zkapauthorizer/_plugin.py @@ -17,19 +17,23 @@ The Twisted plugin that glues the Zero-Knowledge Access Pass system into Tahoe-LAFS. """ +import random from weakref import ( WeakValueDictionary, ) from datetime import ( datetime, + timedelta, ) - import attr from zope.interface import ( implementer, ) +from twisted.logger import ( + Logger, +) from twisted.python.filepath import ( FilePath, ) @@ -41,6 +45,9 @@ from allmydata.interfaces import ( IFoolscapStoragePlugin, IAnnounceableStorageServer, ) +from allmydata.client import ( + _Client, +) from privacypass import ( SigningKey, ) @@ -61,6 +68,13 @@ from .resource import ( from .controller import ( get_redeemer, ) +from .lease_maintenance import ( + SERVICE_NAME, + lease_maintenance_service, + maintain_leases_from_root, +) + +_log = Logger() @implementer(IAnnounceableStorageServer) @attr.s @@ -69,8 +83,8 @@ class AnnounceableStorageServer(object): storage_server = attr.ib() -@attr.s @implementer(IFoolscapStoragePlugin) +@attr.s class ZKAPAuthorizer(object): """ A storage plugin which provides a token-based access control mechanism on @@ -161,3 +175,78 @@ class ZKAPAuthorizer(object): store=self._get_store(node_config), redeemer=self._get_redeemer(node_config, None, reactor), ) + + +_init_storage = _Client.__dict__["init_storage"] +def maintenance_init_storage(self, announceable_storage_servers): + """ + A monkey-patched version of ``_Client.init_storage`` which also + initializes the lease maintenance service. + """ + from twisted.internet import reactor + result = _init_storage(self, announceable_storage_servers) + _maybe_attach_maintenance_service(reactor, self) + return result +_Client.init_storage = maintenance_init_storage + + +def _maybe_attach_maintenance_service(reactor, client_node): + """ + Check for an existing lease maintenance service and if one is not found, + create one. + + :param allmydata.client._Client client_node: The client node to check and, + possibly, modify. A lease maintenance service is added to it if and + only if one is not already present. + """ + try: + # If there is already one we don't need another. + client_node.getServiceNamed(SERVICE_NAME) + except KeyError: + # There isn't one so make it and add it. + _log.info("Creating new lease maintenance service") + _create_maintenance_service( + reactor, + client_node.config, + client_node, + ).setServiceParent(client_node) + except Exception: + _log.failure("Attaching maintenance service to client node") + else: + _log.info("Found existing lease maintenance service") + + +def _create_maintenance_service(reactor, node_config, client_node): + """ + Create a lease maintenance service to be attached to the given client + node. + + :param allmydata.node._Config node_config: The configuration for the node + the lease maintenance service will be attached to. + + :param allmydata.client._Client client_node: The client node the lease + maintenance service will be attached to. + """ + def get_now(): + return datetime.utcfromtimestamp(reactor.seconds()) + + # Create the operation which performs the lease maintenance job when + # called. + maintain_leases = maintain_leases_from_root( + client_node.create_node_from_uri( + node_config.get_private_config(b"rootcap"), + ), + client_node.get_storage_broker(), + client_node._secret_holder, + # Make this configuration + timedelta(days=3), + get_now, + ) + last_run_path = FilePath(node_config.get_private_path(b"last-lease-maintenance-run")) + # Create the service to periodically run the lease maintenance operation. + return lease_maintenance_service( + maintain_leases, + reactor, + last_run_path, + random, + ) diff --git a/src/_zkapauthorizer/foolscap.py b/src/_zkapauthorizer/foolscap.py index f0137ac2714a51bf15fd127b2c4265b4017e50cb..3d69a5358f258f6fa2807e04dc0b0d8563a243ca 100644 --- a/src/_zkapauthorizer/foolscap.py +++ b/src/_zkapauthorizer/foolscap.py @@ -175,9 +175,16 @@ class RIPrivacyPassAuthorizedStorageServer(RemoteInterface): ): """ Get various metadata about shares in the given storage index or slot. + + :return [{int: ShareStat}]: A list of share stats. Dictionaries in + the list corresponds to the results for each storage index + requested by the ``storage_indexes_or_slots`` argument. Items in + the dictionary give share stats for each share known to this + server to be associated with the corresponding storage index. + Keys are share numbers and values are the stats. """ # Any() should be ShareStat but I don't know how to spell that. - return ListOf(ListOf(DictOf(int, Any()))) + return ListOf(DictOf(int, Any())) slot_readv = RIStorageServer["slot_readv"] diff --git a/src/_zkapauthorizer/lease_maintenance.py b/src/_zkapauthorizer/lease_maintenance.py new file mode 100644 index 0000000000000000000000000000000000000000..e7e0bb3c3d53cf64b89e865692c372323b031924 --- /dev/null +++ b/src/_zkapauthorizer/lease_maintenance.py @@ -0,0 +1,475 @@ +# Copyright 2019 PrivateStorage.io, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module implements a service which periodically spends ZKAPs to +refresh leases on all shares reachable from a root. +""" + +from functools import ( + partial, +) +from datetime import ( + datetime, + timedelta, +) +from errno import ( + ENOENT, +) +import attr + +from aniso8601 import ( + parse_datetime, +) + +from twisted.internet.defer import ( + inlineCallbacks, + maybeDeferred, +) +from twisted.application.service import ( + Service, +) +from twisted.python.log import ( + err, +) + +from allmydata.interfaces import ( + IDirectoryNode, +) +from allmydata.util.hashutil import ( + file_renewal_secret_hash, + bucket_renewal_secret_hash, +) + +from .controller import ( + bracket, +) + +SERVICE_NAME = u"lease maintenance service" + +@inlineCallbacks +def visit_storage_indexes(root_node, visit): + """ + Call a visitor with the storage index of ``root_node`` and that of all + nodes reachable from it. + + :param IFilesystemNode root_node: The node from which to start. + + :param visit: A one-argument callable. It will be called with the storage + index of all visited nodes. + + :return Deferred: A Deferred which fires after all nodes have been + visited. + """ + stack = [root_node] + while stack: + elem = stack.pop() + visit(elem.get_storage_index()) + if IDirectoryNode.providedBy(elem): + children = yield elem.list() + # Produce consistent results by forcing some consistent ordering + # here. This will sort by name. + stable_children = sorted(children.items()) + for (name, (child_node, child_metadata)) in stable_children: + stack.append(child_node) + + +def iter_storage_indexes(visit_assets): + """ + Get an iterator over storage indexes of all nodes visited by + ``visit_assets``. + + :param visit_assets: A one-argument function which takes a visit function + and calls it with all nodes to visit. + + :return Deferred[list[bytes]]: A Deferred that fires with a list of + storage indexes from the visited nodes. The list is in an arbitrary + order and does not include duplicates if any nodes were visited more + than once. + """ + storage_indexes = set() + visit = storage_indexes.add + d = visit_assets(visit) + # Create some order now that we've ensured they're unique. + d.addCallback(lambda ignored: list(storage_indexes)) + return d + + +@inlineCallbacks +def renew_leases(visit_assets, storage_broker, secret_holder, min_lease_remaining, now): + """ + Check the leases on a group of nodes for those which are expired or close + to expiring and renew such leases. + + :param visit_assets: A one-argument callable which takes a visitor + function and calls it with the storage index of every node to check. + + :param StorageFarmBroker storage_broker: A storage broker which can supply + the storage servers where the nodes should be checked. + + :param SecretHolder secret_holder: The source of the renew secret for any + leases which require renewal. + + :param timedelta min_lease_remaining: The minimum amount of time remaining + to allow on a lease without renewing it. + + :param now: A no-argument function returning the current time, as a + datetime instance, for comparison against lease expiration time. + + :return Deferred: A Deferred which fires when all visitable nodes have + been checked and any leases renewed which required it. + """ + storage_indexes = yield iter_storage_indexes(visit_assets) + + renewal_secret = secret_holder.get_renewal_secret() + servers = storage_broker.get_connected_servers() + + for server in servers: + # Consider parallelizing this. + yield renew_leases_on_server( + min_lease_remaining, + renewal_secret, + storage_indexes, + server, + now(), + ) + + +@inlineCallbacks +def renew_leases_on_server( + min_lease_remaining, + renewal_secret, + storage_indexes, + server, + now, +): + """ + Check leases on the shares for the given storage indexes on the given + storage server for those which are expired or close to expiring and renew + such leases. + + :param timedelta min_lease_remaining: The minimum amount of time remaining + to allow on a lease without renewing it. + + :param renewal_secret: A seed for the renewal secret hash calculation for + any leases which need to be renewed. + + :param list[bytes] storage_indexes: The storage indexes to check. + + :param StorageServer server: The storage server on which to check. + + :param datetime now: The current time for comparison against the least + expiration time. + + :return Deferred: A Deferred which fires after all storage indexes have + been checked and any leases that need renewal have been renewed. + """ + stats = yield server.stat_shares(storage_indexes) + for storage_index, stat_dict in zip(storage_indexes, stats): + if not stat_dict: + # The server has no shares for this storage index. + continue + # All shares have the same lease information. + stat = stat_dict.popitem()[1] + if needs_lease_renew(min_lease_remaining, stat, now): + yield renew_lease(renewal_secret, storage_index, server) + + +def renew_lease(renewal_secret, storage_index, server): + """ + Renew the lease on the shares in one storage index on one server. + + :param renewal_secret: A seed for the renewal secret hash calculation for + any leases which need to be renewed. + + :param bytes storage_index: The storage index to operate on. + + :param StorageServer server: The storage server to operate on. + + :return Deferred: A Deferred that fires when the lease has been renewed. + """ + # See allmydata/immutable/checker.py, _get_renewal_secret + renew_secret = bucket_renewal_secret_hash( + file_renewal_secret_hash( + renewal_secret, + storage_index, + ), + server.get_lease_seed(), + ) + return server.renew_lease( + storage_index, + renew_secret, + ) + + +def needs_lease_renew(min_lease_remaining, stat, now): + """ + Determine if a lease needs renewal. + + :param timedelta min_lease_remaining: The minimum amount of time remaining + to allow on a lease without renewing it. + + :param ShareStat stat: The metadata about a share to consider. + + :param datetime now: The current time for comparison against the lease + expiration time. + + :return bool: ``True`` if the lease needs to be renewed, ``False`` + otherwise. + """ + remaining = datetime.utcfromtimestamp(stat.lease_expiration) - now + return remaining < min_lease_remaining + + +@attr.s +class _FuzzyTimerService(Service): + """ + A service to periodically, but not *too* periodically, run an operation. + + :ivar operation: A no-argument callable to fuzzy-periodically run. It may + return a Deferred in which case the next run will not be scheduled + until the Deferred fires. + + :ivar timedelta initial_interval: The amount of time to wait before the first + run of the operation. + + :ivar sample_interval_distribution: A no-argument callable which returns a + number of seconds as a float giving the amount of time to wait before + the next run of the operation. It will be called each time the + operation completes. + + :ivar IReactorTime reactor: A Twisted reactor to use to schedule runs of + the operation. + """ + name = attr.ib() + operation = attr.ib() + initial_interval = attr.ib() + sample_interval_distribution = attr.ib() + reactor = attr.ib() + + def startService(self): + Service.startService(self) + self._call = self.reactor.callLater( + self.initial_interval.total_seconds(), + self._iterate, + ) + + def stopService(self): + self._call.cancel() + self._call = None + return Service.stopService(self) + + def _iterate(self): + """ + Run the operation once and then schedule it to run again. + """ + d = maybeDeferred(self.operation) + d.addErrback(err, "Fuzzy timer service ({})".format(self.name)) + d.addCallback(lambda ignored: self._schedule()) + + def _schedule(self): + """ + Schedule the next run of the operation. + """ + self._call = self.reactor.callLater( + self.sample_interval_distribution().total_seconds(), + self._iterate, + ) + + +def lease_maintenance_service( + maintain_leases, + reactor, + last_run_path, + random, + interval_mean=None, + interval_range=None, +): + """ + Get an ``IService`` which will maintain leases on ``root_node`` and any + nodes directly or transitively reachable from it. + + :param IReactorClock reactor: A Twisted reactor for scheduling renewal + activity. + + :param FilePath last_run_path: A path containing the time (as an ISO8601 + datetime string) at which lease maintenance last ran to inform an + adjustment to the first interval before running it again. If no file + exists at the path it is treated as though there has been no previous + run. The path will also be rewritten on each run to update this + value. + + :param random: An object like ``random.Random`` which can be used as a + source of scheduling delay. + + :param timedelta interval_mean: The mean time between lease renewal checks. + + :param timedelta interval_range: The range of the uniform distribution of + lease renewal checks (centered on ``interval_mean``). + + :param maintain_leases: A no-argument callable which performs a round of + lease-maintenance. The resulting service calls this periodically. + """ + if interval_mean is None: + interval_mean = timedelta(days=26) + if interval_range is None: + interval_range = timedelta(days=4) + halfrange = interval_range / 2 + + def sample_interval_distribution(): + return timedelta( + seconds=random.uniform( + (interval_mean - halfrange).total_seconds(), + (interval_mean + halfrange).total_seconds(), + ), + ) + # Rather than an all-or-nothing last-run time we probably eventually want + # to have a more comprehensive record of the state when we were last + # interrupted. This would remove the unfortunate behavior of restarting + # from the beginning if we shut down during a lease scan. Shutting down + # during a lease scan becomes increasingly likely the more shares there + # are to check. + last_run = read_time_from_path(last_run_path) + if last_run is None: + initial_interval = sample_interval_distribution() + else: + initial_interval = calculate_initial_interval( + sample_interval_distribution, + last_run, + datetime.utcfromtimestamp(reactor.seconds()), + ) + initial_interval = max( + initial_interval, + timedelta(0), + ) + + + return _FuzzyTimerService( + SERVICE_NAME, + lambda: bracket( + lambda: None, + lambda: write_time_to_path( + last_run_path, + datetime.utcfromtimestamp(reactor.seconds()), + ), + maintain_leases, + ), + initial_interval, + sample_interval_distribution, + reactor, + ) + + +def write_time_to_path(path, when): + """ + Write an ISO8601 datetime string to a file. + + :param FilePath path: The path to a file to which to write the datetime + string. + + :param datetime when: The datetime to write. + """ + path.setContent(when.isoformat()) + + +def read_time_from_path(path): + """ + Read an ISO8601 datetime string from a file. + + :param FilePath path: The path to a file containing a datetime string. + + :return: None if no file exists at the path. Otherwise, a datetime + instance giving the time represented in the file. + """ + try: + when = path.getContent() + except IOError as e: + if ENOENT == e.errno: + return None + raise + else: + return parse_datetime(when) + + +def visit_storage_indexes_from_root(visitor, root_node): + """ + An operation for ``lease_maintenance_service`` which applies the given + visitor to ``root_node`` and all its children. + + :param visitor: A one-argument callable which takes the traversal function + and which should call it as desired. + + :param IFilesystemNode root_node: The filesystem node at which traversal + will begin. + + :return: A no-argument callable to perform the visits. + """ + return partial( + visitor, + partial(visit_storage_indexes, root_node), + ) + + +def maintain_leases_from_root(root_node, storage_broker, secret_holder, min_lease_remaining, get_now): + """ + An operation for ``lease_maintenance_service`` which visits ``root_node`` + and all its children and renews their leases if they have + ``min_lease_remaining`` or less on them. + + :param IFilesystemNode root_node: A Tahoe-LAFS filesystem node to use as + the root of a node hierarchy to be maintained. + + :param StorageFarmBroker storage_broker: The storage broker which can put + us in touch with storage servers where shares of the nodes to maintain + might be found. + + :param SecretHolder secret_holder: The Tahoe-LAFS client node secret + holder which can give us the lease renewal secrets needed to renew + leases. + + :param timedelta min_lease_remaining: The minimum amount of time remaining + to allow on a lease without renewing it. + + :param get_now: A no-argument callable that returns the current time as a + ``datetime`` instance. + + :return: A no-argument callable to perform the maintenance. + """ + def visitor(visit_assets): + return renew_leases( + visit_assets, + storage_broker, + secret_holder, + min_lease_remaining, + get_now, + ) + + return visit_storage_indexes_from_root( + visitor, + root_node, + ) + + +def calculate_initial_interval(sample_interval_distribution, last_run, now): + """ + Determine how long to wait before performing an initial (for this process) + scan for aging leases. + + :param sample_interval_distribution: See ``_FuzzyTimerService``. + :param datetime last_run: The time of the last scan. + :param datetime now: The current time. + """ + since_last_run = now - last_run + initial_interval = sample_interval_distribution() - since_last_run + return initial_interval diff --git a/src/_zkapauthorizer/tests/matchers.py b/src/_zkapauthorizer/tests/matchers.py index bcb4edbd37f4e8a5642405cf070e20f1d805b213..b3730b75554bac25fa5432cb39f82a97da413cb9 100644 --- a/src/_zkapauthorizer/tests/matchers.py +++ b/src/_zkapauthorizer/tests/matchers.py @@ -16,6 +16,10 @@ Testtools matchers useful for the test suite. """ +from datetime import ( + datetime, +) + import attr from testtools.matchers import ( @@ -23,6 +27,13 @@ from testtools.matchers import ( Mismatch, ContainsDict, Always, + MatchesAll, + MatchesAny, + GreaterThan, + LessThan, + Equals, + AfterPreprocessing, + AllMatch, ) @attr.s @@ -78,3 +89,47 @@ class _Returns(Matcher): def __str__(self): return "Returns({})".format(self.result_matcher) + + +def between(low, high): + """ + Matches a value in the range [low, high]. + """ + return MatchesAll( + MatchesAny( + Equals(low), + GreaterThan(low), + ), + MatchesAny( + Equals(high), + LessThan(high), + ), + ) + + +def leases_current(relevant_storage_indexes, now, min_lease_remaining): + """ + Return a matcher on a ``DummyStorageServer`` instance which matches + servers for which the leases on the given storage indexes do not expire + before ``min_lease_remaining``. + """ + return AfterPreprocessing( + # Get share stats for storage indexes we should have + # visited and maintained. + lambda storage_server: list( + stat + for (storage_index, stat) + in storage_server.buckets.items() + if storage_index in relevant_storage_indexes + ), + AllMatch( + AfterPreprocessing( + # Lease expiration for anything visited must be + # further in the future than min_lease_remaining, + # either because it had time left or because we + # renewed it. + lambda share_stat: datetime.utcfromtimestamp(share_stat.lease_expiration), + GreaterThan(now + min_lease_remaining), + ), + ), + ) diff --git a/src/_zkapauthorizer/tests/strategies.py b/src/_zkapauthorizer/tests/strategies.py index f4d1ce5029dd1975e220ebd9d078c8fb3a7320f5..b4d5daaca56d9a28e0c18bea6ff994580752c9ba 100644 --- a/src/_zkapauthorizer/tests/strategies.py +++ b/src/_zkapauthorizer/tests/strategies.py @@ -25,6 +25,10 @@ from datetime import ( import attr +from zope.interface import ( + implementer, +) + from hypothesis.strategies import ( one_of, just, @@ -40,8 +44,12 @@ from hypothesis.strategies import ( fixed_dictionaries, builds, datetimes, + recursive, ) +from twisted.internet.defer import ( + succeed, +) from twisted.internet.task import ( Clock, ) @@ -50,6 +58,8 @@ from twisted.web.test.requesthelper import ( ) from allmydata.interfaces import ( + IFilesystemNode, + IDirectoryNode, HASH_SIZE, ) @@ -583,3 +593,71 @@ def clocks(now=posix_safe_datetimes()): c.advance((when - _POSIX_EPOCH).total_seconds()) return c return now.map(clock_at_time) + + + + +@implementer(IFilesystemNode) +@attr.s +class _LeafNode(object): + _storage_index = attr.ib() + + def get_storage_index(self): + return self._storage_index + + # For testing + def flatten(self): + return [self] + + +def leaf_nodes(): + return storage_indexes().map(_LeafNode) + + +@implementer(IDirectoryNode) +@attr.s +class _DirectoryNode(object): + _storage_index = attr.ib() + _children = attr.ib() + + def list(self): + return succeed(self._children) + + def get_storage_index(self): + return self._storage_index + + # For testing + def flatten(self): + result = [self] + for (node, _) in self._children.values(): + result.extend(node.flatten()) + return result + + +def directory_nodes(child_strategy): + """ + Build directory nodes with children drawn from the given strategy. + """ + children = dictionaries( + text(), + tuples( + child_strategy, + just({}), + ), + ) + return builds( + _DirectoryNode, + storage_indexes(), + children, + ) + + +def node_hierarchies(): + """ + Build hierarchies of ``IDirectoryNode`` and other ``IFilesystemNode`` + (incomplete) providers. + """ + return recursive( + leaf_nodes(), + directory_nodes, + ) diff --git a/src/_zkapauthorizer/tests/test_lease_maintenance.py b/src/_zkapauthorizer/tests/test_lease_maintenance.py new file mode 100644 index 0000000000000000000000000000000000000000..0c1452d9a527400d87eeacec824bd64168b465a1 --- /dev/null +++ b/src/_zkapauthorizer/tests/test_lease_maintenance.py @@ -0,0 +1,511 @@ +# Copyright 2019 PrivateStorage.io, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for ``_zkapauthorizer.lease_maintenance``. +""" + +from __future__ import ( + absolute_import, + unicode_literals, +) + +from datetime import ( + datetime, + timedelta, +) + +import attr + +from testtools import ( + TestCase, +) +from testtools.matchers import ( + Is, + Equals, + Always, + HasLength, + MatchesAll, + AllMatch, + AfterPreprocessing, +) +from testtools.twistedsupport import ( + succeeded, +) +from fixtures import ( + TempDir, +) +from hypothesis import ( + given, + note, +) +from hypothesis.strategies import ( + builds, + binary, + integers, + lists, + floats, + dictionaries, + randoms, + composite, + just, +) + +from twisted.python.filepath import ( + FilePath, +) +from twisted.internet.task import ( + Clock, +) +from twisted.internet.defer import ( + succeed, + maybeDeferred, +) +from twisted.application.service import ( + IService, +) + +from allmydata.util.hashutil import ( + CRYPTO_VAL_SIZE, +) +from allmydata.client import ( + SecretHolder, +) + +from ..foolscap import ( + ShareStat, +) + +from .matchers import ( + Provides, + between, + leases_current, +) +from .strategies import ( + storage_indexes, + clocks, + leaf_nodes, + node_hierarchies, +) + +from ..lease_maintenance import ( + lease_maintenance_service, + maintain_leases_from_root, + visit_storage_indexes_from_root, + renew_leases, +) + + +def interval_means(): + return floats( + # It doesn't make sense to have a negative check interval mean. + min_value=0, + # We can't make this value too large or it isn't convertable to a + # timedelta. Also, even values as large as this one are of + # questionable value. + max_value=60 * 60 * 24 * 365, + ).map( + # By representing the result as a timedelta we avoid the cases where + # the lower precision of timedelta compared to float drops the whole + # value (anything between 0 and 1 microsecond). This is just one + # example of how working with timedeltas is nicer, in general. + lambda s: timedelta(seconds=s), + ) + + +def dummy_maintain_leases(): + pass + + +@attr.s +class DummyStorageServer(object): + """ + A dummy implementation of ``IStorageServer`` from Tahoe-LAFS. + + :ivar dict[bytes, datetime] buckets: A mapping from storage index to lease + expiration time for shares at that storage index. + """ + clock = attr.ib() + buckets = attr.ib() + lease_seed = attr.ib() + + def stat_shares(self, storage_indexes): + return succeed(list( + {0: self.buckets[idx]} if idx in self.buckets else {} + for idx + in storage_indexes + )) + + def get_lease_seed(self): + return self.lease_seed + + def renew_lease(self, storage_index, renew_secret): + self.buckets[storage_index].lease_expiration = ( + self.clock.seconds() + timedelta(days=31).total_seconds() + ) + + +def lease_seeds(): + return binary( + min_size=20, + max_size=20, + ) + + +def share_stats(): + return builds( + ShareStat, + size=integers(min_value=0), + lease_expiration=integers(min_value=0, max_value=2 ** 31), + ) + + +def storage_servers(clocks): + return builds( + DummyStorageServer, + clocks, + dictionaries(storage_indexes(), share_stats()), + lease_seeds(), + ) + + +@attr.s +class DummyStorageBroker(object): + clock = attr.ib() + _storage_servers = attr.ib() + + def get_connected_servers(self): + return self._storage_servers + + +@composite +def storage_brokers(draw, clocks): + clock = draw(clocks) + return DummyStorageBroker( + clock, + draw(lists(storage_servers(just(clock)))), + ) + + +class LeaseMaintenanceServiceTests(TestCase): + """ + Tests for the service returned by ``lease_maintenance_service``. + """ + @given(randoms()) + def test_interface(self, random): + """ + The service provides ``IService``. + """ + clock = Clock() + service = lease_maintenance_service( + dummy_maintain_leases, + clock, + FilePath(self.useFixture(TempDir()).join(u"last-run")), + random, + ) + self.assertThat( + service, + Provides([IService]), + ) + + @given( + randoms(), + interval_means(), + ) + def test_initial_interval(self, random, mean): + """ + When constructed without a value for ``last_run``, + ``lease_maintenance_service`` schedules its first run to take place + after an interval that falls uniformly in range centered on ``mean`` + with a size of ``range``. + """ + clock = Clock() + # Construct a range that fits in with the mean + range_ = timedelta( + seconds=random.uniform(0, mean.total_seconds()), + ) + + service = lease_maintenance_service( + dummy_maintain_leases, + clock, + FilePath(self.useFixture(TempDir()).join(u"last-run")), + random, + mean, + range_, + ) + service.startService() + [maintenance_call] = clock.getDelayedCalls() + + datetime_now = datetime.utcfromtimestamp(clock.seconds()) + low = datetime_now + mean - (range_ / 2) + high = datetime_now + mean + (range_ / 2) + self.assertThat( + datetime.utcfromtimestamp(maintenance_call.getTime()), + between(low, high), + ) + + @given( + randoms(), + clocks(), + interval_means(), + interval_means(), + ) + def test_initial_interval_with_last_run(self, random, clock, mean, since_last_run): + """ + When constructed with a value for ``last_run``, + ``lease_maintenance_service`` schedules its first run to take place + sooner than it otherwise would, by at most the time since the last + run. + """ + datetime_now = datetime.utcfromtimestamp(clock.seconds()) + # Construct a range that fits in with the mean + range_ = timedelta( + seconds=random.uniform(0, mean.total_seconds()), + ) + + # Figure out the absolute last run time. + last_run = datetime_now - since_last_run + last_run_path = FilePath(self.useFixture(TempDir()).join(u"last-run")) + last_run_path.setContent(last_run.isoformat()) + + service = lease_maintenance_service( + dummy_maintain_leases, + clock, + last_run_path, + random, + mean, + range_, + ) + service.startService() + [maintenance_call] = clock.getDelayedCalls() + + low = datetime_now + max( + timedelta(0), + mean - (range_ / 2) - since_last_run, + ) + high = max( + # If since_last_run is one microsecond (precision of timedelta) + # then the range is indivisible. Avoid putting the expected high + # below the expected low. + low, + datetime_now + mean + (range_ / 2) - since_last_run, + ) + + note("mean: {}\nrange: {}\nnow: {}\nlow: {}\nhigh: {}\nsince last: {}".format( + mean, range_, datetime_now, low, high, since_last_run, + )) + + self.assertThat( + datetime.utcfromtimestamp(maintenance_call.getTime()), + between(low, high), + ) + + @given( + randoms(), + clocks(), + ) + def test_clean_up_when_stopped(self, random, clock): + """ + When the service is stopped, the delayed call in the reactor is removed. + """ + service = lease_maintenance_service( + lambda: None, + clock, + FilePath(self.useFixture(TempDir()).join(u"last-run")), + random, + ) + service.startService() + self.assertThat( + maybeDeferred(service.stopService), + succeeded(Is(None)), + ) + self.assertThat( + clock.getDelayedCalls(), + Equals([]), + ) + self.assertThat( + service.running, + Equals(False), + ) + + @given( + randoms(), + clocks(), + ) + def test_nodes_visited(self, random, clock): + """ + When the service runs, it calls the ``maintain_leases`` object. + """ + leases_maintained_at = [] + def maintain_leases(): + leases_maintained_at.append(datetime.utcfromtimestamp(clock.seconds())) + + service = lease_maintenance_service( + maintain_leases, + clock, + FilePath(self.useFixture(TempDir()).join(u"last-run")), + random, + ) + service.startService() + [maintenance_call] = clock.getDelayedCalls() + clock.advance(maintenance_call.getTime() - clock.seconds()) + + self.assertThat( + leases_maintained_at, + Equals([datetime.utcfromtimestamp(clock.seconds())]), + ) + + +class VisitStorageIndexesFromRootTests(TestCase): + """ + Tests for ``visit_storage_indexes_from_root``. + """ + @given(node_hierarchies(), clocks()) + def test_visits_all_nodes(self, root_node, clock): + """ + The operation calls the specified visitor with every node from the root to + its deepest children. + """ + visited = [] + def perform_visit(visit_assets): + return visit_assets(visited.append) + + operation = visit_storage_indexes_from_root( + perform_visit, + root_node, + ) + + self.assertThat( + operation(), + succeeded(Always()), + ) + expected = root_node.flatten() + self.assertThat( + visited, + MatchesAll( + HasLength(len(expected)), + AfterPreprocessing( + set, + Equals(set( + node.get_storage_index() + for node + in expected + )), + ), + ), + ) + + +class RenewLeasesTests(TestCase): + """ + Tests for ``renew_leases``. + """ + @given(storage_brokers(clocks()), lists(leaf_nodes())) + def test_renewed(self, storage_broker, nodes): + """ + ``renew_leases`` renews the leases of shares on all storage servers which + have no more than the specified amount of time remaining on their + current lease. + """ + lease_secret = b"\0" * CRYPTO_VAL_SIZE + convergence_secret = b"\1" * CRYPTO_VAL_SIZE + secret_holder = SecretHolder(lease_secret, convergence_secret) + min_lease_remaining = timedelta(days=3) + + def get_now(): + return datetime.utcfromtimestamp( + storage_broker.clock.seconds(), + ) + + def visit_assets(visit): + for node in nodes: + visit(node.get_storage_index()) + return succeed(None) + + d = renew_leases( + visit_assets, + storage_broker, + secret_holder, + min_lease_remaining, + get_now, + ) + self.assertThat( + d, + succeeded(Always()), + ) + + relevant_storage_indexes = set( + node.get_storage_index() + for node + in nodes + ) + + self.assertThat( + storage_broker.get_connected_servers(), + AllMatch(leases_current( + relevant_storage_indexes, + get_now(), + min_lease_remaining, + )), + ) + + +class MaintainLeasesFromRootTests(TestCase): + """ + Tests for ``maintain_leases_from_root``. + """ + @given(storage_brokers(clocks()), node_hierarchies()) + def test_renewed(self, storage_broker, root_node): + """ + ``maintain_leases_from_root`` creates an operation which renews the leases + of shares on all storage servers which have no more than the specified + amount of time remaining on their current lease. + """ + lease_secret = b"\0" * CRYPTO_VAL_SIZE + convergence_secret = b"\1" * CRYPTO_VAL_SIZE + secret_holder = SecretHolder(lease_secret, convergence_secret) + min_lease_remaining = timedelta(days=3) + + def get_now(): + return datetime.utcfromtimestamp( + storage_broker.clock.seconds(), + ) + + operation = maintain_leases_from_root( + root_node, + storage_broker, + secret_holder, + min_lease_remaining, + get_now, + ) + d = operation() + self.assertThat( + d, + succeeded(Always()), + ) + + relevant_storage_indexes = set( + node.get_storage_index() + for node + in root_node.flatten() + ) + + self.assertThat( + storage_broker.get_connected_servers(), + AllMatch(leases_current( + relevant_storage_indexes, + get_now(), + min_lease_remaining, + )) + ) diff --git a/src/_zkapauthorizer/tests/test_plugin.py b/src/_zkapauthorizer/tests/test_plugin.py index 9c97ea7dc597f672aa4f417d9673100df76e2a38..325a87b941320179ec63501616625973e868f82c 100644 --- a/src/_zkapauthorizer/tests/test_plugin.py +++ b/src/_zkapauthorizer/tests/test_plugin.py @@ -23,7 +23,10 @@ from __future__ import ( from io import ( BytesIO, ) - +from os import ( + makedirs, +) +import tempfile from zope.interface import ( implementer, ) @@ -53,6 +56,7 @@ from hypothesis import ( from hypothesis.strategies import ( just, datetimes, + sampled_from, ) from foolscap.broker import ( Broker, @@ -71,6 +75,9 @@ from allmydata.interfaces import ( IStorageServer, RIStorageServer, ) +from allmydata.client import ( + create_client_from_config, +) from twisted.python.filepath import ( FilePath, @@ -94,6 +101,9 @@ from ..model import ( from ..controller import ( IssuerConfigurationMismatch, ) +from ..lease_maintenance import ( + SERVICE_NAME, +) from .strategies import ( minimal_tahoe_configs, @@ -391,3 +401,91 @@ class ClientResourceTests(TestCase): storage_server.get_client_resource(config), Provides([IResource]), ) + + +SERVERS_YAML = b""" +storage: + v0-aaaaaaaa: + ann: + anonymous-storage-FURL: pb://@tcp:/ + nickname: 10.0.0.2 + storage-options: + - name: privatestorageio-zkapauthz-v1 + ristretto-issuer-root-url: https://payments.example.com/ + storage-server-FURL: pb://bbbbbbbb@tcp:10.0.0.2:1234/cccccccc +""" + +TWO_SERVERS_YAML = b""" +storage: + v0-aaaaaaaa: + ann: + anonymous-storage-FURL: pb://@tcp:/ + nickname: 10.0.0.2 + storage-options: + - name: privatestorageio-zkapauthz-v1 + ristretto-issuer-root-url: https://payments.example.com/ + storage-server-FURL: pb://bbbbbbbb@tcp:10.0.0.2:1234/cccccccc + v0-dddddddd: + ann: + anonymous-storage-FURL: pb://@tcp:/ + nickname: 10.0.0.3 + storage-options: + - name: privatestorageio-zkapauthz-v1 + ristretto-issuer-root-url: https://payments.example.com/ + storage-server-FURL: pb://eeeeeeee@tcp:10.0.0.3:1234/ffffffff +""" + + +class LeaseMaintenanceServiceTests(TestCase): + """ + Tests for the plugin's initialization of the lease maintenance service. + """ + def _created_test(self, get_config, servers_yaml): + original_tempdir = tempfile.tempdir + + tempdir = self.useFixture(TempDir()) + nodedir = tempdir.join(b"node") + privatedir = tempdir.join(b"node", b"private") + makedirs(privatedir) + config = get_config(nodedir, b"tub.port") + + # Provide it a statically configured server to connect to. + config.write_private_config( + b"servers.yaml", + servers_yaml, + ) + config.write_private_config( + b"rootcap", + b"dddddddd", + ) + + try: + d = create_client_from_config(config) + self.assertThat( + d, + succeeded( + AfterPreprocessing( + lambda client: client.getServiceNamed(SERVICE_NAME), + Always(), + ), + ), + ) + finally: + # create_client_from_config (indirectly) rewrites tempfile.tempdir + # in a destructive manner that fails most of the rest of the test + # suite if we don't clean it up. We can't do this with a tearDown + # or a fixture or an addCleanup because hypothesis doesn't run any + # of those at the right time. :/ + tempfile.tempdir = original_tempdir + + @given( + tahoe_configs_with_dummy_redeemer, + sampled_from([SERVERS_YAML, TWO_SERVERS_YAML]), + ) + def test_created(self, get_config, servers_yaml): + """ + A client created from a configuration with the plugin enabled has a lease + maintenance service after it has at least one storage server to + connect to. + """ + return self._created_test(get_config, servers_yaml) diff --git a/zkapauthorizer.nix b/zkapauthorizer.nix index e97afba83b8986e338e61f727bb0d5281c293779..e23ddbc76b1cd09d2c6374c32af14cb0b7de4c75 100644 --- a/zkapauthorizer.nix +++ b/zkapauthorizer.nix @@ -1,4 +1,5 @@ -{ buildPythonPackage, sphinx +{ lib +, buildPythonPackage, sphinx , attrs, zope_interface, aniso8601, twisted, tahoe-lafs, privacypass, treq , fixtures, testtools, hypothesis, pyflakes, coverage , hypothesisProfile ? null @@ -9,13 +10,15 @@ let hypothesisProfile' = if hypothesisProfile == null then "default" else hypothesisProfile; testSuite' = if testSuite == null then "_zkapauthorizer" else testSuite; - extraTrialArgs = builtins.concatStringsSep " " (if trialArgs == null then ["--rterrors" "--jobs=4" ] else trialArgs); + defaultTrialArgs = [ "--rterrors" ] ++ ( lib.optional ( ! collectCoverage ) "--jobs=$NIX_BUILD_CORES" ); + trialArgs' = if trialArgs == null then defaultTrialArgs else trialArgs; + extraTrialArgs = builtins.concatStringsSep " " trialArgs'; in buildPythonPackage rec { version = "0.0"; pname = "zero-knowledge-access-pass-authorizer"; name = "${pname}-${version}"; - src = ./.; + src = lib.cleanSource ./.; outputs = [ "out" ] ++ (if collectCoverage then [ "doc" ] else [ ]);