diff --git a/src/_zkapauthorizer/_plugin.py b/src/_zkapauthorizer/_plugin.py index 262db7788a568f0c0aadfe167c40385518e3528a..1ec0f75305d2307cafeedd7187f08ef3e1779fa0 100644 --- a/src/_zkapauthorizer/_plugin.py +++ b/src/_zkapauthorizer/_plugin.py @@ -230,6 +230,11 @@ def _create_maintenance_service(reactor, node_config, client_node): def get_now(): return datetime.utcfromtimestamp(reactor.seconds()) + from twisted.plugins.zkapauthorizer import ( + storage_server, + ) + store = storage_server._get_store(node_config) + # Create the operation which performs the lease maintenance job when # called. maintain_leases = maintain_leases_from_root( @@ -240,6 +245,7 @@ def _create_maintenance_service(reactor, node_config, client_node): client_node._secret_holder, # Make this configuration timedelta(days=3), + store.start_lease_maintenance, get_now, ) last_run_path = FilePath(node_config.get_private_path(b"last-lease-maintenance-run")) diff --git a/src/_zkapauthorizer/lease_maintenance.py b/src/_zkapauthorizer/lease_maintenance.py index e7e0bb3c3d53cf64b89e865692c372323b031924..e264f32ca8a0dd60b362da5ca01150563d092173 100644 --- a/src/_zkapauthorizer/lease_maintenance.py +++ b/src/_zkapauthorizer/lease_maintenance.py @@ -29,6 +29,10 @@ from errno import ( ) import attr +from zope.interface import ( + implementer, +) + from aniso8601 import ( parse_datetime, ) @@ -56,8 +60,13 @@ from .controller import ( bracket, ) +from .model import ( + ILeaseMaintenanceObserver, +) + SERVICE_NAME = u"lease maintenance service" + @inlineCallbacks def visit_storage_indexes(root_node, visit): """ @@ -107,7 +116,14 @@ def iter_storage_indexes(visit_assets): @inlineCallbacks -def renew_leases(visit_assets, storage_broker, secret_holder, min_lease_remaining, now): +def renew_leases( + visit_assets, + storage_broker, + secret_holder, + min_lease_remaining, + get_activity_observer, + now, +): """ Check the leases on a group of nodes for those which are expired or close to expiring and renew such leases. @@ -124,12 +140,17 @@ def renew_leases(visit_assets, storage_broker, secret_holder, min_lease_remainin :param timedelta min_lease_remaining: The minimum amount of time remaining to allow on a lease without renewing it. + :param get_activity_observer: A no-argument callable which returns an + ``ILeaseMaintenanceObserver``. + :param now: A no-argument function returning the current time, as a datetime instance, for comparison against lease expiration time. :return Deferred: A Deferred which fires when all visitable nodes have been checked and any leases renewed which required it. """ + activity = get_activity_observer() + storage_indexes = yield iter_storage_indexes(visit_assets) renewal_secret = secret_holder.get_renewal_secret() @@ -142,9 +163,12 @@ def renew_leases(visit_assets, storage_broker, secret_holder, min_lease_remainin renewal_secret, storage_indexes, server, + activity, now(), ) + activity.finish() + @inlineCallbacks def renew_leases_on_server( @@ -152,6 +176,7 @@ def renew_leases_on_server( renewal_secret, storage_indexes, server, + activity, now, ): """ @@ -169,6 +194,9 @@ def renew_leases_on_server( :param StorageServer server: The storage server on which to check. + :param ILeaseMaintenanceObserver activity: An object which will receive + events allowing it to observe the lease maintenance activity. + :param datetime now: The current time for comparison against the least expiration time. @@ -180,6 +208,10 @@ def renew_leases_on_server( if not stat_dict: # The server has no shares for this storage index. continue + + # Keep track of what's been seen. + activity.observe([stat.size for stat in stat_dict.values()]) + # All shares have the same lease information. stat = stat_dict.popitem()[1] if needs_lease_renew(min_lease_remaining, stat, now): @@ -421,7 +453,42 @@ def visit_storage_indexes_from_root(visitor, root_node): ) -def maintain_leases_from_root(root_node, storage_broker, secret_holder, min_lease_remaining, get_now): +@implementer(ILeaseMaintenanceObserver) +class NoopMaintenanceObserver(object): + """ + A lease maintenance observer that does nothing. + """ + def observe(self, sizes): + pass + + def finish(self): + pass + + +@implementer(ILeaseMaintenanceObserver) +@attr.s +class MemoryMaintenanceObserver(object): + """ + A lease maintenance observer that records observations in memory. + """ + observed = attr.ib(default=attr.Factory(list)) + finished = attr.ib(default=False) + + def observe(self, sizes): + self.observed.append(sizes) + + def finish(self): + self.finished = True + + +def maintain_leases_from_root( + root_node, + storage_broker, + secret_holder, + min_lease_remaining, + progress, + get_now, +): """ An operation for ``lease_maintenance_service`` which visits ``root_node`` and all its children and renews their leases if they have @@ -452,6 +519,7 @@ def maintain_leases_from_root(root_node, storage_broker, secret_holder, min_leas storage_broker, secret_holder, min_lease_remaining, + progress, get_now, ) diff --git a/src/_zkapauthorizer/model.py b/src/_zkapauthorizer/model.py index 2dff026a4c112391c368bcb0fc4edd7442aa9d92..52bd98e41bb1e93fce19849fbafc9908800c76c5 100644 --- a/src/_zkapauthorizer/model.py +++ b/src/_zkapauthorizer/model.py @@ -27,6 +27,12 @@ from json import ( from datetime import ( datetime, ) + +from zope.interface import ( + Interface, + implementer, +) + from sqlite3 import ( OperationalError, connect as _connect, @@ -46,6 +52,25 @@ from .storage_common import ( required_passes, ) + +class ILeaseMaintenanceObserver(Interface): + """ + An object which is interested in receiving events related to the progress + of lease maintenance activity. + """ + def observe(sizes): + """ + Observe some shares encountered during lease maintenance. + + :param list[int] sizes: The sizes of the shares encountered. + """ + + def finish(): + """ + Observe that a run of lease maintenance has completed. + """ + + class StoreOpenError(Exception): """ There was a problem opening the underlying data store. @@ -491,6 +516,7 @@ class VoucherStore(object): ) +@implementer(ILeaseMaintenanceObserver) @attr.s class LeaseMaintenance(object): """ diff --git a/src/_zkapauthorizer/tests/test_lease_maintenance.py b/src/_zkapauthorizer/tests/test_lease_maintenance.py index 0c1452d9a527400d87eeacec824bd64168b465a1..a59f776805f1ac53669cf145a1d7e67f992ab782 100644 --- a/src/_zkapauthorizer/tests/test_lease_maintenance.py +++ b/src/_zkapauthorizer/tests/test_lease_maintenance.py @@ -100,6 +100,8 @@ from .strategies import ( ) from ..lease_maintenance import ( + NoopMaintenanceObserver, + MemoryMaintenanceObserver, lease_maintenance_service, maintain_leases_from_root, visit_storage_indexes_from_root, @@ -133,8 +135,8 @@ class DummyStorageServer(object): """ A dummy implementation of ``IStorageServer`` from Tahoe-LAFS. - :ivar dict[bytes, datetime] buckets: A mapping from storage index to lease - expiration time for shares at that storage index. + :ivar dict[bytes, ShareStat] buckets: A mapping from storage index to + metadata about shares at that storage index. """ clock = attr.ib() buckets = attr.ib() @@ -438,6 +440,7 @@ class RenewLeasesTests(TestCase): storage_broker, secret_holder, min_lease_remaining, + NoopMaintenanceObserver, get_now, ) self.assertThat( @@ -487,6 +490,7 @@ class MaintainLeasesFromRootTests(TestCase): storage_broker, secret_holder, min_lease_remaining, + NoopMaintenanceObserver, get_now, ) d = operation() @@ -509,3 +513,53 @@ class MaintainLeasesFromRootTests(TestCase): min_lease_remaining, )) ) + + @given(storage_brokers(clocks()), node_hierarchies()) + def test_activity_observed(self, storage_broker, root_node): + """ + ``maintain_leases_from_root`` creates an operation which uses the given + activity observer to report its progress. + """ + lease_secret = b"\0" * CRYPTO_VAL_SIZE + convergence_secret = b"\1" * CRYPTO_VAL_SIZE + secret_holder = SecretHolder(lease_secret, convergence_secret) + min_lease_remaining = timedelta(days=3) + + def get_now(): + return datetime.utcfromtimestamp( + storage_broker.clock.seconds(), + ) + + observer = MemoryMaintenanceObserver() + # There is only one available. + observers = [observer] + progress = observers.pop + operation = maintain_leases_from_root( + root_node, + storage_broker, + secret_holder, + min_lease_remaining, + progress, + get_now, + ) + d = operation() + self.assertThat( + d, + succeeded(Always()), + ) + + expected = [] + for node in root_node.flatten(): + for storage_server in storage_broker.get_connected_servers(): + try: + stat = storage_server.buckets[node._storage_index] + except KeyError: + continue + else: + # DummyStorageServer always pretends to have only one share + expected.append([stat.size]) + + self.assertThat( + observer.observed, + Equals(expected), + )