diff --git a/src/_zkapauthorizer/_plugin.py b/src/_zkapauthorizer/_plugin.py index 262db7788a568f0c0aadfe167c40385518e3528a..1ec0f75305d2307cafeedd7187f08ef3e1779fa0 100644 --- a/src/_zkapauthorizer/_plugin.py +++ b/src/_zkapauthorizer/_plugin.py @@ -230,6 +230,11 @@ def _create_maintenance_service(reactor, node_config, client_node): def get_now(): return datetime.utcfromtimestamp(reactor.seconds()) + from twisted.plugins.zkapauthorizer import ( + storage_server, + ) + store = storage_server._get_store(node_config) + # Create the operation which performs the lease maintenance job when # called. maintain_leases = maintain_leases_from_root( @@ -240,6 +245,7 @@ def _create_maintenance_service(reactor, node_config, client_node): client_node._secret_holder, # Make this configuration timedelta(days=3), + store.start_lease_maintenance, get_now, ) last_run_path = FilePath(node_config.get_private_path(b"last-lease-maintenance-run")) diff --git a/src/_zkapauthorizer/lease_maintenance.py b/src/_zkapauthorizer/lease_maintenance.py index e7e0bb3c3d53cf64b89e865692c372323b031924..e264f32ca8a0dd60b362da5ca01150563d092173 100644 --- a/src/_zkapauthorizer/lease_maintenance.py +++ b/src/_zkapauthorizer/lease_maintenance.py @@ -29,6 +29,10 @@ from errno import ( ) import attr +from zope.interface import ( + implementer, +) + from aniso8601 import ( parse_datetime, ) @@ -56,8 +60,13 @@ from .controller import ( bracket, ) +from .model import ( + ILeaseMaintenanceObserver, +) + SERVICE_NAME = u"lease maintenance service" + @inlineCallbacks def visit_storage_indexes(root_node, visit): """ @@ -107,7 +116,14 @@ def iter_storage_indexes(visit_assets): @inlineCallbacks -def renew_leases(visit_assets, storage_broker, secret_holder, min_lease_remaining, now): +def renew_leases( + visit_assets, + storage_broker, + secret_holder, + min_lease_remaining, + get_activity_observer, + now, +): """ Check the leases on a group of nodes for those which are expired or close to expiring and renew such leases. @@ -124,12 +140,17 @@ def renew_leases(visit_assets, storage_broker, secret_holder, min_lease_remainin :param timedelta min_lease_remaining: The minimum amount of time remaining to allow on a lease without renewing it. + :param get_activity_observer: A no-argument callable which returns an + ``ILeaseMaintenanceObserver``. + :param now: A no-argument function returning the current time, as a datetime instance, for comparison against lease expiration time. :return Deferred: A Deferred which fires when all visitable nodes have been checked and any leases renewed which required it. """ + activity = get_activity_observer() + storage_indexes = yield iter_storage_indexes(visit_assets) renewal_secret = secret_holder.get_renewal_secret() @@ -142,9 +163,12 @@ def renew_leases(visit_assets, storage_broker, secret_holder, min_lease_remainin renewal_secret, storage_indexes, server, + activity, now(), ) + activity.finish() + @inlineCallbacks def renew_leases_on_server( @@ -152,6 +176,7 @@ def renew_leases_on_server( renewal_secret, storage_indexes, server, + activity, now, ): """ @@ -169,6 +194,9 @@ def renew_leases_on_server( :param StorageServer server: The storage server on which to check. + :param ILeaseMaintenanceObserver activity: An object which will receive + events allowing it to observe the lease maintenance activity. + :param datetime now: The current time for comparison against the least expiration time. @@ -180,6 +208,10 @@ def renew_leases_on_server( if not stat_dict: # The server has no shares for this storage index. continue + + # Keep track of what's been seen. + activity.observe([stat.size for stat in stat_dict.values()]) + # All shares have the same lease information. stat = stat_dict.popitem()[1] if needs_lease_renew(min_lease_remaining, stat, now): @@ -421,7 +453,42 @@ def visit_storage_indexes_from_root(visitor, root_node): ) -def maintain_leases_from_root(root_node, storage_broker, secret_holder, min_lease_remaining, get_now): +@implementer(ILeaseMaintenanceObserver) +class NoopMaintenanceObserver(object): + """ + A lease maintenance observer that does nothing. + """ + def observe(self, sizes): + pass + + def finish(self): + pass + + +@implementer(ILeaseMaintenanceObserver) +@attr.s +class MemoryMaintenanceObserver(object): + """ + A lease maintenance observer that records observations in memory. + """ + observed = attr.ib(default=attr.Factory(list)) + finished = attr.ib(default=False) + + def observe(self, sizes): + self.observed.append(sizes) + + def finish(self): + self.finished = True + + +def maintain_leases_from_root( + root_node, + storage_broker, + secret_holder, + min_lease_remaining, + progress, + get_now, +): """ An operation for ``lease_maintenance_service`` which visits ``root_node`` and all its children and renews their leases if they have @@ -452,6 +519,7 @@ def maintain_leases_from_root(root_node, storage_broker, secret_holder, min_leas storage_broker, secret_holder, min_lease_remaining, + progress, get_now, ) diff --git a/src/_zkapauthorizer/model.py b/src/_zkapauthorizer/model.py index 65d653ddf94ceaa3435fbaf9a2bb690029f4de4b..52bd98e41bb1e93fce19849fbafc9908800c76c5 100644 --- a/src/_zkapauthorizer/model.py +++ b/src/_zkapauthorizer/model.py @@ -27,6 +27,12 @@ from json import ( from datetime import ( datetime, ) + +from zope.interface import ( + Interface, + implementer, +) + from sqlite3 import ( OperationalError, connect as _connect, @@ -41,6 +47,29 @@ from twisted.python.filepath import ( FilePath, ) +from .storage_common import ( + BYTES_PER_PASS, + required_passes, +) + + +class ILeaseMaintenanceObserver(Interface): + """ + An object which is interested in receiving events related to the progress + of lease maintenance activity. + """ + def observe(sizes): + """ + Observe some shares encountered during lease maintenance. + + :param list[int] sizes: The sizes of the shares encountered. + """ + + def finish(): + """ + Observe that a run of lease maintenance has completed. + """ + class StoreOpenError(Exception): """ @@ -157,6 +186,26 @@ def open_and_initialize(path, required_schema_version, connect=None): ) """, ) + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS [lease-maintenance-spending] ( + [id] integer, -- A unique identifier for a group of activity. + [started] text, -- ISO8601 date+time string when the activity began. + [finished] text, -- ISO8601 date+time string when the activity completed (or null). + + -- The number of passes that would be required to renew all + -- shares encountered during this activity. Note that because + -- leases on different shares don't necessarily expire at the + -- same time this is not necessarily the number of passes + -- **actually** used during this activity. Some shares may + -- not have required lease renewal. Also note that while the + -- activity is ongoing this value may change. + [count] integer, + + PRIMARY KEY([id]) + ) + """, + ) return conn @@ -427,6 +476,136 @@ class VoucherStore(object): u"unblinded-tokens": list(token for (token,) in tokens), } + def start_lease_maintenance(self): + """ + Get an object which can track a newly started round of lease maintenance + activity. + + :return LeaseMaintenance: A new, started lease maintenance object. + """ + m = LeaseMaintenance(self.now, self._connection) + m.start() + return m + + @with_cursor + def get_latest_lease_maintenance_activity(self, cursor): + """ + Get a description of the most recently completed lease maintenance + activity. + + :return LeaseMaintenanceActivity|None: If any lease maintenance has + completed, an object describing its results. Otherwise, None. + """ + cursor.execute( + """ + SELECT [started], [count], [finished] + FROM [lease-maintenance-spending] + WHERE [finished] IS NOT NULL + ORDER BY [finished] DESC + LIMIT 1 + """, + ) + activity = cursor.fetchall() + if len(activity) == 0: + return None + [(started, count, finished)] = activity + return LeaseMaintenanceActivity( + parse_datetime(started, delimiter=u" "), + count, + parse_datetime(finished, delimiter=u" "), + ) + + +@implementer(ILeaseMaintenanceObserver) +@attr.s +class LeaseMaintenance(object): + """ + A state-updating helper for recording pass usage during a lease + maintenance run. + + Get one of these from ``VoucherStore.start_lease_maintenance``. Then use + the ``observe`` and ``finish`` methods to persist state about a lease + maintenance run. + + :ivar _now: A no-argument callable which returns a datetime giving a time + to use as current. + + :ivar _connection: A SQLite3 connection object to use to persist observed + information. + + :ivar _rowid: None for unstarted lease maintenance objects. For started + objects, the database row id that corresponds to the started run. + This is used to make sure future updates go to the right row. + """ + _now = attr.ib() + _connection = attr.ib() + _rowid = attr.ib(default=None) + + @with_cursor + def start(self, cursor): + """ + Record the start of a lease maintenance run. + """ + if self._rowid is not None: + raise Exception("Cannot re-start a particular _LeaseMaintenance.") + + cursor.execute( + """ + INSERT INTO [lease-maintenance-spending] ([started], [finished], [count]) + VALUES (?, ?, ?) + """, + (self._now(), None, 0), + ) + self._rowid = cursor.lastrowid + + @with_cursor + def observe(self, cursor, sizes): + """ + Record a storage shares of the given sizes. + """ + count = required_passes(BYTES_PER_PASS, sizes) + cursor.execute( + """ + UPDATE [lease-maintenance-spending] + SET [count] = [count] + ? + WHERE [id] = ? + """, + (count, self._rowid), + ) + + @with_cursor + def finish(self, cursor): + """ + Record the completion of this lease maintenance run. + """ + cursor.execute( + """ + UPDATE [lease-maintenance-spending] + SET [finished] = ? + WHERE [id] = ? + """, + (self._now(), self._rowid), + ) + self._rowid = None + + +@attr.s +class LeaseMaintenanceActivity(object): + started = attr.ib() + passes_required = attr.ib() + finished = attr.ib() + + +# store = ... +# x = store.start_lease_maintenance() +# x.observe(size=123) +# x.observe(size=456) +# ... +# x.finish() +# +# x = store.get_latest_lease_maintenance_activity() +# xs.started, xs.passes_required, xs.finished + @attr.s(frozen=True) class UnblindedToken(object): diff --git a/src/_zkapauthorizer/tests/strategies.py b/src/_zkapauthorizer/tests/strategies.py index b4d5daaca56d9a28e0c18bea6ff994580752c9ba..4ba150cd8589c160bf895fc4dc6277100f93ca8e 100644 --- a/src/_zkapauthorizer/tests/strategies.py +++ b/src/_zkapauthorizer/tests/strategies.py @@ -657,7 +657,18 @@ def node_hierarchies(): Build hierarchies of ``IDirectoryNode`` and other ``IFilesystemNode`` (incomplete) providers. """ + def storage_indexes_are_distinct(nodes): + seen = set() + for n in nodes.flatten(): + si = n.get_storage_index() + if si in seen: + return False + seen.add(si) + return True + return recursive( leaf_nodes(), directory_nodes, + ).filter( + storage_indexes_are_distinct, ) diff --git a/src/_zkapauthorizer/tests/test_lease_maintenance.py b/src/_zkapauthorizer/tests/test_lease_maintenance.py index 0c1452d9a527400d87eeacec824bd64168b465a1..7d4df1de10ab11a5bdb1df2522c97d2bfce64d6c 100644 --- a/src/_zkapauthorizer/tests/test_lease_maintenance.py +++ b/src/_zkapauthorizer/tests/test_lease_maintenance.py @@ -100,6 +100,8 @@ from .strategies import ( ) from ..lease_maintenance import ( + NoopMaintenanceObserver, + MemoryMaintenanceObserver, lease_maintenance_service, maintain_leases_from_root, visit_storage_indexes_from_root, @@ -133,8 +135,8 @@ class DummyStorageServer(object): """ A dummy implementation of ``IStorageServer`` from Tahoe-LAFS. - :ivar dict[bytes, datetime] buckets: A mapping from storage index to lease - expiration time for shares at that storage index. + :ivar dict[bytes, ShareStat] buckets: A mapping from storage index to + metadata about shares at that storage index. """ clock = attr.ib() buckets = attr.ib() @@ -438,6 +440,7 @@ class RenewLeasesTests(TestCase): storage_broker, secret_holder, min_lease_remaining, + NoopMaintenanceObserver, get_now, ) self.assertThat( @@ -487,6 +490,7 @@ class MaintainLeasesFromRootTests(TestCase): storage_broker, secret_holder, min_lease_remaining, + NoopMaintenanceObserver, get_now, ) d = operation() @@ -509,3 +513,59 @@ class MaintainLeasesFromRootTests(TestCase): min_lease_remaining, )) ) + + @given(storage_brokers(clocks()), node_hierarchies()) + def test_activity_observed(self, storage_broker, root_node): + """ + ``maintain_leases_from_root`` creates an operation which uses the given + activity observer to report its progress. + """ + lease_secret = b"\0" * CRYPTO_VAL_SIZE + convergence_secret = b"\1" * CRYPTO_VAL_SIZE + secret_holder = SecretHolder(lease_secret, convergence_secret) + min_lease_remaining = timedelta(days=3) + + def get_now(): + return datetime.utcfromtimestamp( + storage_broker.clock.seconds(), + ) + + observer = MemoryMaintenanceObserver() + # There is only one available. + observers = [observer] + progress = observers.pop + operation = maintain_leases_from_root( + root_node, + storage_broker, + secret_holder, + min_lease_remaining, + progress, + get_now, + ) + d = operation() + self.assertThat( + d, + succeeded(Always()), + ) + + expected = [] + for node in root_node.flatten(): + for storage_server in storage_broker.get_connected_servers(): + try: + stat = storage_server.buckets[node.get_storage_index()] + except KeyError: + continue + else: + # DummyStorageServer always pretends to have only one share + expected.append([stat.size]) + + # The visit order doesn't matter. + expected.sort() + + self.assertThat( + observer.observed, + AfterPreprocessing( + sorted, + Equals(expected), + ), + ) diff --git a/src/_zkapauthorizer/tests/test_model.py b/src/_zkapauthorizer/tests/test_model.py index d31e669a9f2fd108fa67e25aa655ef747707e246..9e7b779f7a842022437085b7082b5732873b2b3f 100644 --- a/src/_zkapauthorizer/tests/test_model.py +++ b/src/_zkapauthorizer/tests/test_model.py @@ -27,6 +27,9 @@ from os import ( from errno import ( EACCES, ) +from datetime import ( + timedelta, +) from testtools import ( TestCase, @@ -52,7 +55,9 @@ from hypothesis import ( from hypothesis.strategies import ( data, lists, + tuples, datetimes, + timedeltas, integers, ) @@ -60,6 +65,10 @@ from twisted.python.filepath import ( FilePath, ) +from ..storage_common import ( + BYTES_PER_PASS, +) + from ..model import ( SchemaError, StoreOpenError, @@ -68,6 +77,7 @@ from ..model import ( Pending, DoubleSpend, Redeemed, + LeaseMaintenanceActivity, open_and_initialize, memory_connect, ) @@ -78,6 +88,7 @@ from .strategies import ( voucher_objects, random_tokens, unblinded_tokens, + posix_safe_datetimes, ) from .fixtures import ( TemporaryVoucherStore, @@ -217,7 +228,6 @@ class VoucherStoreTests(TestCase): If the underlying database file cannot be opened then ``VoucherStore.from_node_config`` raises ``StoreOpenError``. """ - tempdir = self.useFixture(TempDir()) nodedir = tempdir.join(b"node") @@ -238,6 +248,75 @@ class VoucherStoreTests(TestCase): ) +class LeaseMaintenanceTests(TestCase): + """ + Tests for the lease-maintenance related parts of ``VoucherStore``. + """ + @given( + tahoe_configs(), + posix_safe_datetimes(), + lists( + tuples( + # How much time passes before this activity starts + timedeltas(min_value=timedelta(1), max_value=timedelta(days=1)), + # Some activity. This list of two tuples gives us a trivial + # way to compute the total passes required (just sum the pass + # counts in it). This is nice because it avoids having the + # test re-implement size quantization which would just be + # repeated code duplicating the implementation. The second + # value lets us fuzz the actual size values a little bit in a + # way which shouldn't affect the passes required. + lists( + tuples( + # The activity itself, in pass count + integers(min_value=1, max_value=2 ** 16 - 1), + # Amount by which to trim back the share sizes + integers(min_value=0, max_value=BYTES_PER_PASS - 1), + ), + ), + # How much time passes before this activity finishes + timedeltas(min_value=timedelta(1), max_value=timedelta(days=1)), + ), + ), + ) + def test_lease_maintenance_activity(self, get_config, now, activity): + """ + ``VoucherStore.get_latest_lease_maintenance_activity`` returns a + ``LeaseMaintenanceTests`` with fields reflecting the most recently + finished lease maintenance activity. + """ + store = self.useFixture( + TemporaryVoucherStore(get_config, lambda: now), + ).store + + expected = None + for (start_delay, sizes, finish_delay) in activity: + now += start_delay + started = now + x = store.start_lease_maintenance() + passes_required = 0 + for (num_passes, trim_size) in sizes: + passes_required += num_passes + x.observe([ + num_passes * BYTES_PER_PASS - trim_size, + ]) + now += finish_delay + x.finish() + finished = now + + # Let the last iteration of the loop define the expected value. + expected = LeaseMaintenanceActivity( + started, + passes_required, + finished, + ) + + self.assertThat( + store.get_latest_lease_maintenance_activity(), + Equals(expected), + ) + + class VoucherTests(TestCase): """ Tests for ``Voucher``.