diff --git a/src/_zkapauthorizer/_storage_server.py b/src/_zkapauthorizer/_storage_server.py index 392f2ebcd8a7b104369a239c607866601197c01f..09934d352a1552365e2c8a3deb20333ee4f5debd 100644 --- a/src/_zkapauthorizer/_storage_server.py +++ b/src/_zkapauthorizer/_storage_server.py @@ -33,6 +33,9 @@ from struct import calcsize, unpack import attr from allmydata.interfaces import RIStorageServer from allmydata.storage.common import storage_index_to_dir +from allmydata.storage.immutable import ShareFile +from allmydata.storage.mutable import MutableShareFile +from allmydata.storage.shares import get_share_file from allmydata.util.base32 import b2a from attr.validators import instance_of, provides from challenge_bypass_ristretto import SigningKey, TokenPreimage, VerificationSignature @@ -278,6 +281,7 @@ class ZKAPAuthorizerStorageServer(Referenceable): ) def remote_stat_shares(self, storage_indexes_or_slots): + # type: (List[bytes]) -> List[Dict[int, ShareStat]] return list( dict(stat_share(self._original, storage_index_or_slot)) for storage_index_or_slot in storage_indexes_or_slots @@ -616,28 +620,13 @@ def get_storage_index_share_size(sharepath): return share_file_size - header_size - (number_of_leases * (4 + 32 + 32 + 4)) -def get_lease_expiration(get_leases, storage_index_or_slot): - """ - Get the lease expiration time for the shares in a bucket or slot, or None - if there is no lease on them. - - :param get_leases: A one-argument callable which returns the leases. - - :param storage_index_or_slot: Either a storage index or a slot identifying - the shares the leases of which to inspect. - """ - for lease in get_leases(storage_index_or_slot): - return lease.get_expiration_time() - return None - - def stat_bucket(storage_server, storage_index, sharepath): """ Get a ``ShareStat`` for the shares in a bucket. """ return ShareStat( size=get_storage_index_share_size(sharepath), - lease_expiration=get_lease_expiration(storage_server.get_leases, storage_index), + lease_expiration=get_lease_expiration(sharepath), ) @@ -647,10 +636,26 @@ def stat_slot(storage_server, slot, sharepath): """ return ShareStat( size=get_slot_share_size(sharepath), - lease_expiration=get_lease_expiration(storage_server.get_slot_leases, slot), + lease_expiration=get_lease_expiration(sharepath), ) +def get_lease_expiration(sharepath): + # type: (str) -> Optional[int] + """ + Get the latest lease expiration time for the share at the given path, or + ``None`` if there are no leases on it. + + :param sharepath: The path to the share file to inspect. + """ + leases = list( + lease.get_expiration_time() for lease in get_share_file(sharepath).get_leases() + ) + if leases: + return max(leases) + return None + + def get_slot_share_size(sharepath): """ Get the size of a share belonging to a slot (a mutable share). diff --git a/src/_zkapauthorizer/lease_maintenance.py b/src/_zkapauthorizer/lease_maintenance.py index 8858c91b85e42017a92131d41691cf5f69fd35fa..9fe3186224f334446e83e5ac829e47942d78a2e3 100644 --- a/src/_zkapauthorizer/lease_maintenance.py +++ b/src/_zkapauthorizer/lease_maintenance.py @@ -206,12 +206,31 @@ def renew_leases_on_server( # Keep track of what's been seen. activity.observe([stat.size for stat in stat_dict.values()]) - # All shares have the same lease information. - stat = stat_dict.popitem()[1] - if needs_lease_renew(min_lease_remaining, stat, now): + # Each share has its own leases and each lease has its own expiration + # time. For each share the server only returns the lease with the + # expiration time farthest in the future. + # + # There is no API for renewing leases on just *some* shares! It is + # all or nothing. So from the server's response we find the share + # that will have no active lease soonest and make our decision about + # whether to renew leases at this storage index or not based on that. + most_endangered = soonest_expiration(stat_dict.values()) + if needs_lease_renew(min_lease_remaining, most_endangered, now): yield renew_lease(renewal_secret, cancel_secret, storage_index, server) +def soonest_expiration(stats): + # type: (Iterable[ShareStat]) -> ShareStat + """ + :return: The share stat from ``stats`` with a lease which expires before + all others. + """ + return min( + stats, + key=lambda stat: stat.lease_expiration, + ) + + def renew_lease(renewal_secret, cancel_secret, storage_index, server): """ Renew the lease on the shares in one storage index on one server. diff --git a/src/_zkapauthorizer/tests/matchers.py b/src/_zkapauthorizer/tests/matchers.py index 4bfe7362bcfa96eb3198cb7ef6cc0ca3b0e9ea90..bf5ab30eeba9c029f70604c31952ed70597099cd 100644 --- a/src/_zkapauthorizer/tests/matchers.py +++ b/src/_zkapauthorizer/tests/matchers.py @@ -135,14 +135,17 @@ def leases_current(relevant_storage_indexes, now, min_lease_remaining): servers for which the leases on the given storage indexes do not expire before ``min_lease_remaining``. """ + + def get_relevant_stats(storage_server): + for (storage_index, shares) in storage_server.buckets.items(): + if storage_index in relevant_storage_indexes: + for (sharenum, stat) in shares.items(): + yield stat + return AfterPreprocessing( # Get share stats for storage indexes we should have # visited and maintained. - lambda storage_server: list( - stat - for (storage_index, stat) in storage_server.buckets.items() - if storage_index in relevant_storage_indexes - ), + lambda storage_server: list(get_relevant_stats(storage_server)), AllMatch( AfterPreprocessing( # Lease expiration for anything visited must be diff --git a/src/_zkapauthorizer/tests/test_lease_maintenance.py b/src/_zkapauthorizer/tests/test_lease_maintenance.py index 2a8743dda3fbf6298f316928f093dd3ce3df5b64..87345284ccf38f17272596dd71e1e6bf916bff6e 100644 --- a/src/_zkapauthorizer/tests/test_lease_maintenance.py +++ b/src/_zkapauthorizer/tests/test_lease_maintenance.py @@ -36,6 +36,8 @@ from hypothesis.strategies import ( just, lists, randoms, + sets, + tuples, ) from testtools import TestCase from testtools.matchers import ( @@ -64,7 +66,13 @@ from ..lease_maintenance import ( visit_storage_indexes_from_root, ) from .matchers import Provides, between, leases_current -from .strategies import clocks, leaf_nodes, node_hierarchies, storage_indexes +from .strategies import ( + clocks, + node_hierarchies, + posix_timestamps, + sharenums, + storage_indexes, +) def interval_means(): @@ -93,41 +101,40 @@ class DummyStorageServer(object): """ A dummy implementation of ``IStorageServer`` from Tahoe-LAFS. - :ivar dict[bytes, ShareStat] buckets: A mapping from storage index to + :ivar buckets: A mapping from storage index to metadata about shares at that storage index. """ clock = attr.ib() - buckets = attr.ib() + buckets = attr.ib() # type: Dict[bytes, Dict[int, ShareStat]] lease_seed = attr.ib() def stat_shares(self, storage_indexes): - return succeed( - list( - {0: self.buckets[idx]} if idx in self.buckets else {} - for idx in storage_indexes - ) - ) + # type: (List[bytes]) -> Deferred[List[Dict[int, ShareStat]]] + return succeed(list(self.buckets.get(idx, {}) for idx in storage_indexes)) def get_lease_seed(self): return self.lease_seed def add_lease(self, storage_index, renew_secret, cancel_secret): - self.buckets[storage_index].lease_expiration = ( - self.clock.seconds() + timedelta(days=31).total_seconds() - ) + for stat in self.buckets.get(storage_index, {}).values(): + stat.lease_expiration = ( + self.clock.seconds() + timedelta(days=31).total_seconds() + ) class SharesAlreadyExist(Exception): pass -def create_shares(storage_server, storage_index, size, lease_expiration): +def create_share(storage_server, storage_index, sharenum, size, lease_expiration): + # type: (DummyStorageServer, bytes, int, int, int) -> None """ - Initialize a storage index ("bucket") with some shares. + Add a share to a storage index ("bucket"). :param DummyServer storage_server: The server to populate with shares. :param bytes storage_index: The storage index of the shares. + :param sharenum: The share number to add. :param int size: The application data size of the shares. :param int lease_expiration: The expiration time for the lease to attach to the shares. @@ -137,11 +144,12 @@ def create_shares(storage_server, storage_index, size, lease_expiration): :return: ``None`` """ - if storage_index in storage_server.buckets: + if sharenum in storage_server.buckets.get(storage_index, {}): raise SharesAlreadyExist( "Cannot create shares for storage index where they already exist.", ) - storage_server.buckets[storage_index] = ShareStat( + bucket = storage_server.buckets.setdefault(storage_index, {}) + bucket[sharenum] = ShareStat( size=size, lease_expiration=lease_expiration, ) @@ -166,7 +174,7 @@ def storage_servers(clocks): return builds( DummyStorageServer, clocks, - dictionaries(storage_indexes(), share_stats()), + dictionaries(storage_indexes(), dictionaries(sharenums(), share_stats())), lease_seeds(), ).map( DummyServer, @@ -425,13 +433,51 @@ class VisitStorageIndexesFromRootTests(TestCase): ) +def lists_of_buckets(): + """ + Build lists of bucket descriptions. + + A bucket description is a two-tuple of a storage index and a dict mapping + share numbers to lease expiration times (as posix timestamps). Any given + storage index will appear only once in the overall result. + """ + + def add_expiration_times(sharenums): + return builds( + lambda nums, expires: dict(zip(nums, expires)), + just(sharenums), + lists( + posix_timestamps(), + min_size=len(sharenums), + max_size=len(sharenums), + ), + ) + + def buckets_strategy(count): + si_strategy = sets(storage_indexes(), min_size=count, max_size=count) + sharenum_strategy = lists( + sets(sharenums(), min_size=1).flatmap(add_expiration_times), + min_size=count, + max_size=count, + ) + expiration_strategy = lists + return builds( + zip, + si_strategy, + sharenum_strategy, + ) + + bucket_count_strategy = integers(min_value=0, max_value=100) + return bucket_count_strategy.flatmap(buckets_strategy) + + class RenewLeasesTests(TestCase): """ Tests for ``renew_leases``. """ - @given(storage_brokers(clocks()), lists(leaf_nodes(), unique=True)) - def test_renewed(self, storage_broker, nodes): + @given(storage_brokers(clocks()), lists_of_buckets()) + def test_renewed(self, storage_broker, buckets): """ ``renew_leases`` renews the leases of shares on all storage servers which have no more than the specified amount of time remaining on their @@ -445,18 +491,20 @@ class RenewLeasesTests(TestCase): # Make sure that the storage brokers have shares at the storage # indexes we're going to operate on. for storage_server in storage_broker.get_connected_servers(): - for node in nodes: - try: - create_shares( - storage_server.get_storage_server(), - node.get_storage_index(), - size=123, - lease_expiration=int(storage_broker.clock.seconds()), - ) - except SharesAlreadyExist: - # If Hypothesis already put some shares in this storage - # index, that's okay too. - pass + for (storage_index, shares) in buckets: + for sharenum, expiration_time in shares.items(): + try: + create_share( + storage_server.get_storage_server(), + storage_index, + sharenum, + size=123, + lease_expiration=int(expiration_time), + ) + except SharesAlreadyExist: + # If the storage_brokers() strategy already put a + # share at this location, that's okay too. + pass def get_now(): return datetime.utcfromtimestamp( @@ -464,8 +512,8 @@ class RenewLeasesTests(TestCase): ) def visit_assets(visit): - for node in nodes: - visit(node.get_storage_index()) + for storage_index, ignored in buckets: + visit(storage_index) return succeed(None) d = renew_leases( @@ -481,8 +529,6 @@ class RenewLeasesTests(TestCase): succeeded(Always()), ) - relevant_storage_indexes = set(node.get_storage_index() for node in nodes) - self.assertThat( list( server.get_storage_server() @@ -490,7 +536,7 @@ class RenewLeasesTests(TestCase): ), AllMatch( leases_current( - relevant_storage_indexes, + list(storage_index for (storage_index, ignored) in buckets), get_now(), min_lease_remaining, ) @@ -590,12 +636,14 @@ class MaintainLeasesFromRootTests(TestCase): for node in root_node.flatten(): for server in storage_broker.get_connected_servers(): try: - stat = server.get_storage_server().buckets[node.get_storage_index()] + shares = server.get_storage_server().buckets[ + node.get_storage_index() + ] except KeyError: continue else: - # DummyStorageServer always pretends to have only one share - expected.append([stat.size]) + if shares: + expected.append(list(stat.size for stat in shares.values())) # The visit order doesn't matter. expected.sort()