diff --git a/nix/sources.json b/nix/sources.json index de6daf54efcdfe37d9ed4d6584a8a56afdeb94cd..c3b118cf7953a45487530fab7ebd4c7ee6f0c5c7 100644 --- a/nix/sources.json +++ b/nix/sources.json @@ -3,12 +3,12 @@ "branch": "master", "description": "Create highly reproducible python environments", "homepage": "", - "owner": "PrivateStorageio", + "owner": "DavHau", "repo": "mach-nix", - "rev": "4d2cc6c", - "sha256": "03xabrwzbby6dcp3w4li7p9cxsca5n2jlz452sz7r4h1n5sx9mwg", + "rev": "dc94135e31d5c90c40a00a6cbdf9330526e8153b", + "sha256": "08l7v0hn9cs8irda0kd55c6lmph3an2i7p47wh2d48hza9pipckr", "type": "tarball", - "url": "https://github.com/PrivateStorageio/mach-nix/archive/4d2cc6c.tar.gz", + "url": "https://github.com/DavHau/mach-nix/archive/dc94135e31d5c90c40a00a6cbdf9330526e8153b.tar.gz", "url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz" }, "niv": { @@ -42,15 +42,15 @@ "url_template": "https://releases.nixos.org/nixos/21.05/nixos-21.05.3740.ce7a1190a0f/nixexprs.tar.xz" }, "tahoe-lafs": { - "branch": "tahoe-lafs-1.16.0", + "branch": "master", "description": "The Tahoe-LAFS decentralized secure filesystem.", "homepage": "https://tahoe-lafs.org/", "owner": "tahoe-lafs", "repo": "tahoe-lafs", - "rev": "2742de6f7c1fa6cf77e35ecc5854bcf7db3e5963", - "sha256": "17bbkk21hfln6gw5lq29g0s3jzbmfwk3921w36shx09i8asfwn56", + "rev": "d3c6f58a8ded7db3324ef97c47f5c1921c3d58b7", + "sha256": "18zr6l53r32pigymsnv10m67kgf981bxl8c3rjhv5bikfnf986q8", "type": "tarball", - "url": "https://github.com/tahoe-lafs/tahoe-lafs/archive/2742de6f7c1fa6cf77e35ecc5854bcf7db3e5963.tar.gz", + "url": "https://github.com/tahoe-lafs/tahoe-lafs/archive/d3c6f58a8ded7db3324ef97c47f5c1921c3d58b7.tar.gz", "url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz" } } diff --git a/src/_zkapauthorizer/_storage_client.py b/src/_zkapauthorizer/_storage_client.py index a52e18b49c2017662585ece9b81798cbf72d8c29..3ae35903b71b581052237858f01adccac117355e 100644 --- a/src/_zkapauthorizer/_storage_client.py +++ b/src/_zkapauthorizer/_storage_client.py @@ -424,6 +424,23 @@ class ZKAPAuthorizerStorageClient(object): # Read operations are free. num_passes = 0 + # Convert tw_vectors from the new internal format to the wire format. + # See https://github.com/tahoe-lafs/tahoe-lafs/pull/1127/files#r716939082 + tw_vectors = { + sharenum: ( + [ + (offset, length, "eq", specimen) + for (offset, length, specimen) in test_vector + ], + data_vectors, + new_length, + ) + for ( + sharenum, + (test_vector, data_vectors, new_length), + ) in tw_vectors.items() + } + if has_writes(tw_vectors): # When performing writes, if we're increasing the storage # requirement, we need to spend more passes. Unfortunately we diff --git a/src/_zkapauthorizer/_storage_server.py b/src/_zkapauthorizer/_storage_server.py index 1d533d90ddb8ba7ab229da0189c6ad41aed554a3..71088ab53cae6eb023e3ab502db6c681c7545f87 100644 --- a/src/_zkapauthorizer/_storage_server.py +++ b/src/_zkapauthorizer/_storage_server.py @@ -33,6 +33,9 @@ from struct import calcsize, unpack import attr from allmydata.interfaces import RIStorageServer from allmydata.storage.common import storage_index_to_dir +from allmydata.storage.immutable import ShareFile +from allmydata.storage.mutable import MutableShareFile +from allmydata.storage.shares import get_share_file from allmydata.util.base32 import b2a from attr.validators import instance_of, provides from challenge_bypass_ristretto import SigningKey, TokenPreimage, VerificationSignature @@ -284,6 +287,7 @@ class ZKAPAuthorizerStorageServer(Referenceable): ) def remote_stat_shares(self, storage_indexes_or_slots): + # type: (List[bytes]) -> List[Dict[int, ShareStat]] return list( dict(stat_share(self._original, storage_index_or_slot)) for storage_index_or_slot in storage_indexes_or_slots @@ -622,28 +626,13 @@ def get_storage_index_share_size(sharepath): return share_file_size - header_size - (number_of_leases * (4 + 32 + 32 + 4)) -def get_lease_expiration(get_leases, storage_index_or_slot): - """ - Get the lease expiration time for the shares in a bucket or slot, or None - if there is no lease on them. - - :param get_leases: A one-argument callable which returns the leases. - - :param storage_index_or_slot: Either a storage index or a slot identifying - the shares the leases of which to inspect. - """ - for lease in get_leases(storage_index_or_slot): - return lease.get_expiration_time() - return None - - def stat_bucket(storage_server, storage_index, sharepath): """ Get a ``ShareStat`` for the shares in a bucket. """ return ShareStat( size=get_storage_index_share_size(sharepath), - lease_expiration=get_lease_expiration(storage_server.get_leases, storage_index), + lease_expiration=get_lease_expiration(sharepath), ) @@ -653,10 +642,26 @@ def stat_slot(storage_server, slot, sharepath): """ return ShareStat( size=get_slot_share_size(sharepath), - lease_expiration=get_lease_expiration(storage_server.get_slot_leases, slot), + lease_expiration=get_lease_expiration(sharepath), ) +def get_lease_expiration(sharepath): + # type: (str) -> Optional[int] + """ + Get the latest lease expiration time for the share at the given path, or + ``None`` if there are no leases on it. + + :param sharepath: The path to the share file to inspect. + """ + leases = list( + lease.get_expiration_time() for lease in get_share_file(sharepath).get_leases() + ) + if leases: + return max(leases) + return None + + def get_slot_share_size(sharepath): """ Get the size of a share belonging to a slot (a mutable share). diff --git a/src/_zkapauthorizer/lease_maintenance.py b/src/_zkapauthorizer/lease_maintenance.py index 8858c91b85e42017a92131d41691cf5f69fd35fa..9fe3186224f334446e83e5ac829e47942d78a2e3 100644 --- a/src/_zkapauthorizer/lease_maintenance.py +++ b/src/_zkapauthorizer/lease_maintenance.py @@ -206,12 +206,31 @@ def renew_leases_on_server( # Keep track of what's been seen. activity.observe([stat.size for stat in stat_dict.values()]) - # All shares have the same lease information. - stat = stat_dict.popitem()[1] - if needs_lease_renew(min_lease_remaining, stat, now): + # Each share has its own leases and each lease has its own expiration + # time. For each share the server only returns the lease with the + # expiration time farthest in the future. + # + # There is no API for renewing leases on just *some* shares! It is + # all or nothing. So from the server's response we find the share + # that will have no active lease soonest and make our decision about + # whether to renew leases at this storage index or not based on that. + most_endangered = soonest_expiration(stat_dict.values()) + if needs_lease_renew(min_lease_remaining, most_endangered, now): yield renew_lease(renewal_secret, cancel_secret, storage_index, server) +def soonest_expiration(stats): + # type: (Iterable[ShareStat]) -> ShareStat + """ + :return: The share stat from ``stats`` with a lease which expires before + all others. + """ + return min( + stats, + key=lambda stat: stat.lease_expiration, + ) + + def renew_lease(renewal_secret, cancel_secret, storage_index, server): """ Renew the lease on the shares in one storage index on one server. diff --git a/src/_zkapauthorizer/tests/matchers.py b/src/_zkapauthorizer/tests/matchers.py index 4bfe7362bcfa96eb3198cb7ef6cc0ca3b0e9ea90..bf5ab30eeba9c029f70604c31952ed70597099cd 100644 --- a/src/_zkapauthorizer/tests/matchers.py +++ b/src/_zkapauthorizer/tests/matchers.py @@ -135,14 +135,17 @@ def leases_current(relevant_storage_indexes, now, min_lease_remaining): servers for which the leases on the given storage indexes do not expire before ``min_lease_remaining``. """ + + def get_relevant_stats(storage_server): + for (storage_index, shares) in storage_server.buckets.items(): + if storage_index in relevant_storage_indexes: + for (sharenum, stat) in shares.items(): + yield stat + return AfterPreprocessing( # Get share stats for storage indexes we should have # visited and maintained. - lambda storage_server: list( - stat - for (storage_index, stat) in storage_server.buckets.items() - if storage_index in relevant_storage_indexes - ), + lambda storage_server: list(get_relevant_stats(storage_server)), AllMatch( AfterPreprocessing( # Lease expiration for anything visited must be diff --git a/src/_zkapauthorizer/tests/test_lease_maintenance.py b/src/_zkapauthorizer/tests/test_lease_maintenance.py index 2a8743dda3fbf6298f316928f093dd3ce3df5b64..87345284ccf38f17272596dd71e1e6bf916bff6e 100644 --- a/src/_zkapauthorizer/tests/test_lease_maintenance.py +++ b/src/_zkapauthorizer/tests/test_lease_maintenance.py @@ -36,6 +36,8 @@ from hypothesis.strategies import ( just, lists, randoms, + sets, + tuples, ) from testtools import TestCase from testtools.matchers import ( @@ -64,7 +66,13 @@ from ..lease_maintenance import ( visit_storage_indexes_from_root, ) from .matchers import Provides, between, leases_current -from .strategies import clocks, leaf_nodes, node_hierarchies, storage_indexes +from .strategies import ( + clocks, + node_hierarchies, + posix_timestamps, + sharenums, + storage_indexes, +) def interval_means(): @@ -93,41 +101,40 @@ class DummyStorageServer(object): """ A dummy implementation of ``IStorageServer`` from Tahoe-LAFS. - :ivar dict[bytes, ShareStat] buckets: A mapping from storage index to + :ivar buckets: A mapping from storage index to metadata about shares at that storage index. """ clock = attr.ib() - buckets = attr.ib() + buckets = attr.ib() # type: Dict[bytes, Dict[int, ShareStat]] lease_seed = attr.ib() def stat_shares(self, storage_indexes): - return succeed( - list( - {0: self.buckets[idx]} if idx in self.buckets else {} - for idx in storage_indexes - ) - ) + # type: (List[bytes]) -> Deferred[List[Dict[int, ShareStat]]] + return succeed(list(self.buckets.get(idx, {}) for idx in storage_indexes)) def get_lease_seed(self): return self.lease_seed def add_lease(self, storage_index, renew_secret, cancel_secret): - self.buckets[storage_index].lease_expiration = ( - self.clock.seconds() + timedelta(days=31).total_seconds() - ) + for stat in self.buckets.get(storage_index, {}).values(): + stat.lease_expiration = ( + self.clock.seconds() + timedelta(days=31).total_seconds() + ) class SharesAlreadyExist(Exception): pass -def create_shares(storage_server, storage_index, size, lease_expiration): +def create_share(storage_server, storage_index, sharenum, size, lease_expiration): + # type: (DummyStorageServer, bytes, int, int, int) -> None """ - Initialize a storage index ("bucket") with some shares. + Add a share to a storage index ("bucket"). :param DummyServer storage_server: The server to populate with shares. :param bytes storage_index: The storage index of the shares. + :param sharenum: The share number to add. :param int size: The application data size of the shares. :param int lease_expiration: The expiration time for the lease to attach to the shares. @@ -137,11 +144,12 @@ def create_shares(storage_server, storage_index, size, lease_expiration): :return: ``None`` """ - if storage_index in storage_server.buckets: + if sharenum in storage_server.buckets.get(storage_index, {}): raise SharesAlreadyExist( "Cannot create shares for storage index where they already exist.", ) - storage_server.buckets[storage_index] = ShareStat( + bucket = storage_server.buckets.setdefault(storage_index, {}) + bucket[sharenum] = ShareStat( size=size, lease_expiration=lease_expiration, ) @@ -166,7 +174,7 @@ def storage_servers(clocks): return builds( DummyStorageServer, clocks, - dictionaries(storage_indexes(), share_stats()), + dictionaries(storage_indexes(), dictionaries(sharenums(), share_stats())), lease_seeds(), ).map( DummyServer, @@ -425,13 +433,51 @@ class VisitStorageIndexesFromRootTests(TestCase): ) +def lists_of_buckets(): + """ + Build lists of bucket descriptions. + + A bucket description is a two-tuple of a storage index and a dict mapping + share numbers to lease expiration times (as posix timestamps). Any given + storage index will appear only once in the overall result. + """ + + def add_expiration_times(sharenums): + return builds( + lambda nums, expires: dict(zip(nums, expires)), + just(sharenums), + lists( + posix_timestamps(), + min_size=len(sharenums), + max_size=len(sharenums), + ), + ) + + def buckets_strategy(count): + si_strategy = sets(storage_indexes(), min_size=count, max_size=count) + sharenum_strategy = lists( + sets(sharenums(), min_size=1).flatmap(add_expiration_times), + min_size=count, + max_size=count, + ) + expiration_strategy = lists + return builds( + zip, + si_strategy, + sharenum_strategy, + ) + + bucket_count_strategy = integers(min_value=0, max_value=100) + return bucket_count_strategy.flatmap(buckets_strategy) + + class RenewLeasesTests(TestCase): """ Tests for ``renew_leases``. """ - @given(storage_brokers(clocks()), lists(leaf_nodes(), unique=True)) - def test_renewed(self, storage_broker, nodes): + @given(storage_brokers(clocks()), lists_of_buckets()) + def test_renewed(self, storage_broker, buckets): """ ``renew_leases`` renews the leases of shares on all storage servers which have no more than the specified amount of time remaining on their @@ -445,18 +491,20 @@ class RenewLeasesTests(TestCase): # Make sure that the storage brokers have shares at the storage # indexes we're going to operate on. for storage_server in storage_broker.get_connected_servers(): - for node in nodes: - try: - create_shares( - storage_server.get_storage_server(), - node.get_storage_index(), - size=123, - lease_expiration=int(storage_broker.clock.seconds()), - ) - except SharesAlreadyExist: - # If Hypothesis already put some shares in this storage - # index, that's okay too. - pass + for (storage_index, shares) in buckets: + for sharenum, expiration_time in shares.items(): + try: + create_share( + storage_server.get_storage_server(), + storage_index, + sharenum, + size=123, + lease_expiration=int(expiration_time), + ) + except SharesAlreadyExist: + # If the storage_brokers() strategy already put a + # share at this location, that's okay too. + pass def get_now(): return datetime.utcfromtimestamp( @@ -464,8 +512,8 @@ class RenewLeasesTests(TestCase): ) def visit_assets(visit): - for node in nodes: - visit(node.get_storage_index()) + for storage_index, ignored in buckets: + visit(storage_index) return succeed(None) d = renew_leases( @@ -481,8 +529,6 @@ class RenewLeasesTests(TestCase): succeeded(Always()), ) - relevant_storage_indexes = set(node.get_storage_index() for node in nodes) - self.assertThat( list( server.get_storage_server() @@ -490,7 +536,7 @@ class RenewLeasesTests(TestCase): ), AllMatch( leases_current( - relevant_storage_indexes, + list(storage_index for (storage_index, ignored) in buckets), get_now(), min_lease_remaining, ) @@ -590,12 +636,14 @@ class MaintainLeasesFromRootTests(TestCase): for node in root_node.flatten(): for server in storage_broker.get_connected_servers(): try: - stat = server.get_storage_server().buckets[node.get_storage_index()] + shares = server.get_storage_server().buckets[ + node.get_storage_index() + ] except KeyError: continue else: - # DummyStorageServer always pretends to have only one share - expected.append([stat.size]) + if shares: + expected.append(list(stat.size for stat in shares.values())) # The visit order doesn't matter. expected.sort()