diff --git a/src/_zkapauthorizer/_storage_client.py b/src/_zkapauthorizer/_storage_client.py index b7a94f3e78e7aa0cb42e1085ec7cf074a783c24a..fae88f4a79e23ec83784974c9a45db7e55f60012 100644 --- a/src/_zkapauthorizer/_storage_client.py +++ b/src/_zkapauthorizer/_storage_client.py @@ -190,7 +190,7 @@ class ZKAPAuthorizerStorageClient(object): # of the current size of all of the specified shares (keys of # tw_vectors). current_sizes = yield self._rref.callRemote( - "slot_share_sizes", + "share_sizes", storage_index, set(tw_vectors), ) diff --git a/src/_zkapauthorizer/_storage_server.py b/src/_zkapauthorizer/_storage_server.py index 5390b995be0177730f56b146726e4cc97a51da76..4f90e1948c40fe3de6f76bf317434033a64037e1 100644 --- a/src/_zkapauthorizer/_storage_server.py +++ b/src/_zkapauthorizer/_storage_server.py @@ -24,6 +24,10 @@ from __future__ import ( absolute_import, ) +from struct import ( + unpack, +) + from errno import ( ENOENT, ) @@ -37,9 +41,10 @@ from os.path import ( ) from os import ( listdir, - stat, ) - +from datetime import ( + timedelta, +) import attr from attr.validators import ( provides, @@ -136,6 +141,8 @@ class ZKAPAuthorizerStorageServer(Referenceable): A class which wraps an ``RIStorageServer`` to insert pass validity checks before allowing certain functionality. """ + LEASE_PERIOD = timedelta(days=31) + _original = attr.ib(validator=provides(RIStorageServer)) _signing_key = attr.ib(validator=instance_of(SigningKey)) _clock = attr.ib( @@ -176,12 +183,14 @@ class ZKAPAuthorizerStorageServer(Referenceable): :return list[bytes]: The passes which are found to be valid. """ - return list( + result = list( pass_ for pass_ in passes if not self._is_invalid_pass(message, pass_) ) + # print("{}: {} passes, {} valid".format(message, len(passes), len(result))) + return result def remote_get_version(self): """ @@ -222,7 +231,17 @@ class ZKAPAuthorizerStorageServer(Referenceable): Pass-through after a pass check to ensure clients can only extend the duration of share storage if they present valid passes. """ - self._validate_passes(add_lease_message(storage_index), passes) + # print("server add_lease({}, {!r})".format(len(passes), storage_index)) + valid_passes = self._validate_passes(add_lease_message(storage_index), passes) + allocated_sizes = dict( + get_share_sizes( + self._original, storage_index, + list(get_all_share_numbers(self._original, storage_index)), + ), + ).values() + # print("allocated_sizes: {}".format(allocated_sizes)) + check_pass_quantity(len(valid_passes), allocated_sizes) + # print("Checked out") return self._original.remote_add_lease(storage_index, *a, **kw) def remote_renew_lease(self, passes, storage_index, *a, **kw): @@ -240,9 +259,9 @@ class ZKAPAuthorizerStorageServer(Referenceable): """ return self._original.remote_advise_corrupt_share(*a, **kw) - def remote_slot_share_sizes(self, storage_index, sharenums): + def remote_share_sizes(self, storage_index_or_slot, sharenums): return dict( - get_slot_share_sizes(self._original, storage_index, sharenums) + get_share_sizes(self._original, storage_index_or_slot, sharenums) ) def remote_slot_testv_and_readv_and_writev( @@ -276,11 +295,12 @@ class ZKAPAuthorizerStorageServer(Referenceable): ) if has_active_lease(self._original, storage_index, self._clock.seconds()): # Some of the storage is paid for already. - current_sizes = dict(get_slot_share_sizes( + current_sizes = dict(get_share_sizes( self._original, storage_index, tw_vectors.keys(), )) + # print("has writes, has active lease, current sizes: {}".format(current_sizes)) else: # None of it is. current_sizes = {} @@ -335,6 +355,14 @@ def has_active_lease(storage_server, storage_index, now): ) +def check_pass_quantity(valid_count, share_sizes): + required_pass_count = required_passes(BYTES_PER_PASS, share_sizes) + if valid_count < required_pass_count: + raise MorePassesRequired( + valid_count, + required_pass_count, + ) + def check_pass_quantity_for_write(valid_count, sharenums, allocated_size): """ Determine if the given number of valid passes is sufficient for an @@ -349,35 +377,10 @@ def check_pass_quantity_for_write(valid_count, sharenums, allocated_size): :return: ``None`` if the number of valid passes given is sufficient. """ - required_pass_count = required_passes(BYTES_PER_PASS, [allocated_size] * len(sharenums)) - # print("valid_count = {}".format(valid_count)) - # print("sharenums = {}".format(len(sharenums))) - # print("allocated size = {}".format(allocated_size)) - # print("required_pass_count = {}".format(required_pass_count)) - if valid_count < required_pass_count: - raise MorePassesRequired( - valid_count, - required_pass_count, - ) - - -def get_slot_share_sizes(storage_server, storage_index, sharenums): - """ - Retrieve the on-disk storage committed to the given shares in the given - storage index. - - :param allmydata.storage.server.StorageServer storage_server: The storage - server which owns the on-disk storage. + check_pass_quantity(valid_count, [allocated_size] * len(sharenums)) - :param bytes storage_index: The storage index to inspect. - :param list[int] sharenums: The share numbers to consider. - - :return generator[(int, int)]: Pairs of share number, bytes on disk of the - given shares. Note this is naive with respect to filesystem features - like compression or sparse files. It is just the size reported by the - filesystem. - """ +def get_all_share_paths(storage_server, storage_index): bucket = join(storage_server.sharedir, storage_index_to_dir(storage_index)) try: contents = listdir(bucket) @@ -392,28 +395,44 @@ def get_slot_share_sizes(storage_server, storage_index, sharenums): except ValueError: pass else: - if sharenum in sharenums: - try: - metadata = stat(join(bucket, candidate)) - except Exception as e: - print(e) + yield sharenum, join(bucket, candidate) + + +def get_all_share_numbers(storage_server, storage_index): + for sharenum, sharepath in get_all_share_paths(storage_server, storage_index): + yield sharenum + + +def get_share_sizes(storage_server, storage_index_or_slot, sharenums): + """ + Get the sizes of the given share numbers for the given storage index *or* + slot. + """ + get_size = None + for sharenum, sharepath in get_all_share_paths(storage_server, storage_index_or_slot): + if get_size is None: + # Figure out if it is a storage index or a slot. + with open(sharepath) as share_file: + magic = share_file.read(32) + if magic == "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e": + get_size = get_slot_share_size else: - # Compared to calculating how much *user* data we're - # storing, the on-disk file is larger by at *least* - # SLOT_HEADER_SIZE. There is also a variable sized - # trailer which is harder to compute but which is at least - # LEASE_TRAILER_SIZE. Fortunately it's often exactly - # LEASE_TRAILER_SIZE so I'm just going to ignore it for - # now. - # - # By measuring that the slots are larger than the data the - # user is storing we'll overestimate how many passes are - # required right around the boundary between two costs. - # Oops. - yield ( - sharenum, - metadata.st_size - SLOT_HEADER_SIZE - LEASE_TRAILER_SIZE, - ) + get_size = get_storage_index_share_size + yield sharenum, get_size(sharepath) + + +def get_storage_index_share_size(sharepath): + with open(sharepath) as share_file: + share_data_length_bytes = share_file.read(8)[4:] + (share_data_length,) = unpack('>L', share_data_length_bytes) + return share_data_length + + +def get_slot_share_size(sharepath): + with open(sharepath) as share_file: + share_data_length_bytes = share_file.read(92)[-8:] + (share_data_length,) = unpack('>Q', share_data_length_bytes) + return share_data_length # I don't understand why this is required. diff --git a/src/_zkapauthorizer/foolscap.py b/src/_zkapauthorizer/foolscap.py index eb70f11711ec7915c8dbc6f3c0a8c7ba47c797d1..0b704b7a571d9caf919049c3d251e64d018a8a3e 100644 --- a/src/_zkapauthorizer/foolscap.py +++ b/src/_zkapauthorizer/foolscap.py @@ -122,16 +122,13 @@ class RIPrivacyPassAuthorizedStorageServer(RemoteInterface): get_buckets = RIStorageServer["get_buckets"] - def slot_share_sizes( - storage_index=StorageIndex, + def share_sizes( + storage_index_or_slot=StorageIndex, sharenums=SetOf(int, maxLength=MAX_BUCKETS), ): """ - Get the size of the given shares in the given storage index. If a share - has no stored state, its size is reported as 0. - - The reported size may be larger than the actual share size if there - are more than four leases on the share. + Get the size of the given shares in the given storage index or slot. If a + share has no stored state, its size is reported as 0. """ return DictOf(int, Offset) diff --git a/src/_zkapauthorizer/storage_common.py b/src/_zkapauthorizer/storage_common.py index d1e7240d3c3f97f86d577588017512664e189b38..a41f40a89a59c5212b8a7b8187739577dc12454a 100644 --- a/src/_zkapauthorizer/storage_common.py +++ b/src/_zkapauthorizer/storage_common.py @@ -59,11 +59,13 @@ def required_passes(bytes_per_pass, share_sizes): :return int: The number of passes required to cover the storage cost. """ - return int( + result = int( ceil( sum(share_sizes, 0) / bytes_per_pass, ), ) + # print("required_passes({}, {}) == {}".format(bytes_per_pass, share_sizes, result)) + return result def has_writes(tw_vectors): @@ -112,21 +114,26 @@ def get_allocated_size(tw_vectors): ) -def get_implied_data_length(data_vector): +def get_implied_data_length(data_vector, new_length): """ :param data_vector: See ``allmydata.interfaces.DataVector``. :return int: The amount of data, in bytes, implied by a data vector and a size. """ - return max( + data_based_size = max( offset + len(data) for (offset, data) in data_vector ) if data_vector else 0 + if new_length is None: + return data_based_size + # new_length is only allowed to truncate, not expand. + return min(new_length, data_based_size) def get_required_new_passes_for_mutable_write(current_sizes, tw_vectors): + # print("get_required_new_passes_for_mutable_write({}, {})".format(current_sizes, summarize(tw_vectors))) current_passes = required_passes( BYTES_PER_PASS, current_sizes.values(), @@ -134,7 +141,7 @@ def get_required_new_passes_for_mutable_write(current_sizes, tw_vectors): new_sizes = current_sizes.copy() size_updates = { - sharenum: get_implied_data_length(data_vector) + sharenum: get_implied_data_length(data_vector, new_length) for (sharenum, (_, data_vector, new_length)) in tw_vectors.items() } @@ -150,7 +157,22 @@ def get_required_new_passes_for_mutable_write(current_sizes, tw_vectors): required_new_passes = new_passes - current_passes # print("Current sizes: {}".format(current_sizes)) - # print("Current passeS: {}".format(current_passes)) + # print("Current passes: {}".format(current_passes)) # print("New sizes: {}".format(new_sizes)) # print("New passes: {}".format(new_passes)) return required_new_passes + +def summarize(tw_vectors): + return { + sharenum: ( + test_vector, + list( + (offset, len(data)) + for (offset, data) + in data_vectors + ), + new_length, + ) + for (sharenum, (test_vector, data_vectors, new_length)) + in tw_vectors.items() + } diff --git a/src/_zkapauthorizer/tests/test_storage_server.py b/src/_zkapauthorizer/tests/test_storage_server.py index cde2b7c7498d67ae96a7337aab28afe5332cae10..fa05fe4a781c744fee53b81e3938c86673b2414f 100644 --- a/src/_zkapauthorizer/tests/test_storage_server.py +++ b/src/_zkapauthorizer/tests/test_storage_server.py @@ -21,6 +21,9 @@ from __future__ import ( division, ) +from time import ( + time, +) from random import ( shuffle, ) @@ -30,12 +33,12 @@ from testtools import ( from testtools.matchers import ( Equals, AfterPreprocessing, + MatchesStructure, raises, ) from hypothesis import ( given, note, - # reproduce_failure, ) from hypothesis.strategies import ( integers, @@ -46,6 +49,11 @@ from privacypass import ( RandomToken, random_signing_key, ) + +from twisted.internet.task import ( + Clock, +) + from foolscap.referenceable import ( LocalReferenceable, ) @@ -55,6 +63,8 @@ from .privacypass import ( ) from .strategies import ( zkaps, + sizes, + sharenum_sets, storage_indexes, write_enabler_secrets, lease_renew_secrets, @@ -66,6 +76,7 @@ from .fixtures import ( ) from .storage_common import ( cleanup_storage_server, + write_toy_shares, ) from ..api import ( ZKAPAuthorizerStorageServer, @@ -73,25 +84,32 @@ from ..api import ( ) from ..storage_common import ( BYTES_PER_PASS, + required_passes, allocate_buckets_message, + add_lease_message, slot_testv_and_readv_and_writev_message, get_implied_data_length, get_required_new_passes_for_mutable_write, ) - class PassValidationTests(TestCase): """ Tests for pass validation performed by ``ZKAPAuthorizerStorageServer``. """ def setUp(self): super(PassValidationTests, self).setUp() + self.clock = Clock() + # anonymous_storage_server uses time.time() so get our Clock close to + # the same time so we can do lease expiration calculations more + # easily. + self.clock.advance(time()) self.anonymous_storage_server = self.useFixture(AnonymousStorageServer()).storage_server self.signing_key = random_signing_key() self.storage_server = ZKAPAuthorizerStorageServer( self.anonymous_storage_server, self.signing_key, + self.clock, ) @given(integers(min_value=0, max_value=64), lists(zkaps(), max_size=64)) @@ -229,19 +247,22 @@ class PassValidationTests(TestCase): in test_and_write_vectors_for_shares.items() } - note("tw_vectors summarized: {}".format({ - sharenum: ( - test_vector, - list( - (offset, len(data)) - for (offset, data) - in data_vectors - ), - new_length, - ) - for (sharenum, (test_vector, data_vectors, new_length)) - in tw_vectors.items() - })) + def summarize(tw_vectors): + return { + sharenum: ( + test_vector, + list( + (offset, len(data)) + for (offset, data) + in data_vectors + ), + new_length, + ) + for (sharenum, (test_vector, data_vectors, new_length)) + in tw_vectors.items() + } + + note("tw_vectors summarized: {}".format(summarize(tw_vectors))) # print("test suite") required_pass_count = get_required_new_passes_for_mutable_write( @@ -277,18 +298,17 @@ class PassValidationTests(TestCase): "Server denied initial write.", ) - # Find the largest sharenum so we can make it even larger. - sharenum = max( - tw_vectors.keys(), - key=lambda k: get_implied_data_length(tw_vectors[k][1]), - ) + # Pick any share to make larger. + sharenum = next(iter(tw_vectors)) _, data_vector, new_length = tw_vectors[sharenum] - current_length = get_implied_data_length(data_vector) + current_length = get_implied_data_length(data_vector, new_length) new_tw_vectors = { sharenum: make_data_vector(current_length), } + note("new tw_vectors: {}".format(summarize(new_tw_vectors))) + do_extend = lambda: self.storage_server.doRemoteCall( "slot_testv_and_readv_and_writev", (), @@ -305,40 +325,15 @@ class PassValidationTests(TestCase): result = do_extend() except MorePassesRequired as e: self.assertThat( - e.required_count, - Equals(1), + e, + MatchesStructure( + valid_count=Equals(0), + required_count=Equals(1), + ), ) else: self.fail("expected MorePassesRequired, got {}".format(result)) - # @reproduce_failure('4.7.3', 'AXicY2CgMWAEQijr/39GRjCn+D+QxwQX72FgAABQ4QQI') - # @given( - # storage_index=storage_indexes(), - # secrets=tuples( - # write_enabler_secrets(), - # lease_renew_secrets(), - # lease_cancel_secrets(), - # ), - # test_and_write_vectors_for_shares=test_and_write_vectors_for_shares(), - # ) - # def test_extend_mutable_with_new_length_fails_without_passes(self, storage_index, secrets, test_and_write_vectors_for_shares): - # """ - # If ``remote_slot_testv_and_readv_and_writev`` is invoked to increase - # storage usage by supplying a ``new_length`` greater than the current - # share size and without supplying passes, the operation fails with - # ``MorePassesRequired``. - # """ - # return self._test_extend_mutable_fails_without_passes( - # storage_index, - # secrets, - # test_and_write_vectors_for_shares, - # lambda current_length: ( - # [], - # [], - # current_length + BYTES_PER_PASS, - # ), - # ) - @given( storage_index=storage_indexes(), secrets=tuples( @@ -364,3 +359,192 @@ class PassValidationTests(TestCase): None, ), ) + + @given( + storage_index=storage_indexes(), + secrets=tuples( + lease_renew_secrets(), + lease_cancel_secrets(), + ), + sharenums=sharenum_sets(), + allocated_size=sizes(), + ) + def test_add_lease_fails_without_passes(self, storage_index, secrets, sharenums, allocated_size): + """ + If ``remote_add_lease`` is invoked without supplying enough passes to + cover the storage for all shares on the given storage index, the + operation fails with ``MorePassesRequired``. + """ + # hypothesis causes our storage server to be used many times. Clean + # up between iterations. + cleanup_storage_server(self.anonymous_storage_server) + + renew_secret, cancel_secret = secrets + + required_count = required_passes(BYTES_PER_PASS, [allocated_size] * len(sharenums)) + # Create some shares at a slot which will require lease renewal. + write_toy_shares( + self.anonymous_storage_server, + storage_index, + renew_secret, + cancel_secret, + sharenums, + allocated_size, + LocalReferenceable(None), + ) + + # Advance time to a point where the lease is expired. This simplifies + # the logic behind how many passes will be required by the add_leases + # call (all of them). If there is prorating for partially expired + # leases then the calculation for a non-expired lease involves more + # work. + # + # Add some slop here because time.time() is used by some parts of the + # system. :/ + self.clock.advance(self.storage_server.LEASE_PERIOD.total_seconds() + 10.0) + + # Attempt to take out a new lease with one fewer pass than is + # required. + passes = make_passes( + self.signing_key, + add_lease_message(storage_index), + list(RandomToken.create() for i in range(required_count - 1)), + ) + # print("tests add_lease({}, {!r})".format(len(passes), storage_index)) + try: + result = self.storage_server.doRemoteCall( + "add_lease", ( + passes, + storage_index, + renew_secret, + cancel_secret, + ), + {}, + ) + except MorePassesRequired as e: + self.assertThat( + e.valid_count, + Equals(len(passes)), + ) + self.assertThat( + e.required_count, + Equals(required_count), + ) + else: + self.fail("Expected MorePassesRequired, got {}".format(result)) + + + @given( + storage_index=storage_indexes(), + secrets=tuples( + lease_renew_secrets(), + lease_cancel_secrets(), + ), + sharenums=sharenum_sets(), + allocated_size=sizes(), + ) + def test_immutable_share_sizes(self, storage_index, secrets, sharenums, allocated_size): + """ + ``share_sizes`` returns the size of the requested shares in the requested + storage_index + """ + # hypothesis causes our storage server to be used many times. Clean + # up between iterations. + cleanup_storage_server(self.anonymous_storage_server) + + renew_secret, cancel_secret = secrets + write_toy_shares( + self.anonymous_storage_server, + storage_index, + renew_secret, + cancel_secret, + sharenums, + allocated_size, + LocalReferenceable(None), + ) + + actual_sizes = self.storage_server.doRemoteCall( + "share_sizes", ( + storage_index, + sharenums, + ), + {}, + ) + self.assertThat( + actual_sizes, + Equals({ + sharenum: allocated_size + for sharenum + in sharenums + }), + ) + + @given( + slot=storage_indexes(), + secrets=tuples( + write_enabler_secrets(), + lease_renew_secrets(), + lease_cancel_secrets(), + ), + sharenums=sharenum_sets(), + test_and_write_vectors_for_shares=test_and_write_vectors_for_shares(), + ) + def test_mutable_share_sizes(self, slot, secrets, sharenums, test_and_write_vectors_for_shares): + # hypothesis causes our storage server to be used many times. Clean + # up between iterations. + cleanup_storage_server(self.anonymous_storage_server) + + tw_vectors = { + k: v.for_call() + for (k, v) + in test_and_write_vectors_for_shares.items() + } + + # Create an initial share to toy with. + required_pass_count = get_required_new_passes_for_mutable_write( + dict.fromkeys(tw_vectors.keys(), 0), + tw_vectors, + ) + valid_passes = make_passes( + self.signing_key, + slot_testv_and_readv_and_writev_message(slot), + list( + RandomToken.create() + for i + in range(required_pass_count) + ), + ) + test, read = self.storage_server.doRemoteCall( + "slot_testv_and_readv_and_writev", + (), + dict( + passes=valid_passes, + storage_index=slot, + secrets=secrets, + tw_vectors=tw_vectors, + r_vector=[], + ), + ) + self.assertThat( + test, + Equals(True), + "Server denied initial write.", + ) + + expected_sizes = { + sharenum: get_implied_data_length(data_vector, new_length) + for (sharenum, (testv, data_vector, new_length)) + in tw_vectors.items() + } + + actual_sizes = self.storage_server.doRemoteCall( + "share_sizes", ( + slot, + sharenums, + ), + {}, + ) + self.assertThat( + actual_sizes, + Equals(expected_sizes), + )