diff --git a/src/_zkapauthorizer/_storage_server.py b/src/_zkapauthorizer/_storage_server.py index b95eb3b3633c5336211b7f4fb9e9f8e865128849..061d6136125bc20877a4624e97d806e58440e86c 100644 --- a/src/_zkapauthorizer/_storage_server.py +++ b/src/_zkapauthorizer/_storage_server.py @@ -28,6 +28,14 @@ from functools import ( partial, ) +from os.path import ( + join, +) +from os import ( + listdir, + stat, +) + import attr from attr.validators import ( provides, @@ -47,6 +55,9 @@ from foolscap.ipb import ( from allmydata.interfaces import ( RIStorageServer, ) +from allmydata.storage.common import ( + storage_index_to_dir, +) from privacypass import ( TokenPreimage, VerificationSignature, @@ -71,6 +82,9 @@ from .storage_common import ( renew_lease_message, slot_testv_and_readv_and_writev_message, has_writes, + get_sharenums, + get_allocated_size, + get_implied_data_length, ) class MorePassesRequired(Exception): @@ -239,23 +253,36 @@ class ZKAPAuthorizerStorageServer(Referenceable): renew_leases = False if has_writes(tw_vectors): - # Writes are allowed to shares with active leases. - if not has_active_lease( - self._original, - storage_index, - self._clock.seconds(), - ): - # Passes may be supplied with the write to create the - # necessary lease as part of the same operation. This must be - # supported because there is no separate protocol action to - # *create* a slot. Clients just begin writing to it. - valid_passes = self._validate_passes( - slot_testv_and_readv_and_writev_message(storage_index), - passes, + # Passes may be supplied with the write to create the + # necessary lease as part of the same operation. This must be + # supported because there is no separate protocol action to + # *create* a slot. Clients just begin writing to it. + valid_passes = self._validate_passes( + slot_testv_and_readv_and_writev_message(storage_index), + passes, + ) + if has_active_lease(self._original, storage_index, self._clock.seconds()): + current_length = get_slot_share_size(self._original, storage_index, tw_vectors.keys()) + new_length = sum( + ( + get_implied_data_length(data_vector, new_length) + for (_, data_vector, new_length) + in tw_vectors.values() + ), + 0, ) + required_new_passes = ( + required_passes(BYTES_PER_PASS, {0}, new_length) + - required_passes(BYTES_PER_PASS, {0}, current_length) + ) + if required_new_passes > len(valid_passes): + raise MorePassesRequired(len(valid_passes), required_new_passes) + else: check_pass_quantity_for_mutable_write(len(valid_passes), tw_vectors) renew_leases = True + + # Skip over the remotely exposed method and jump to the underlying # implementation which accepts one additional parameter that we know # about (and don't expose over the network): renew_leases. We always @@ -277,38 +304,6 @@ class ZKAPAuthorizerStorageServer(Referenceable): return self._original.remote_slot_readv(*a, **kw) -def get_sharenums(tw_vectors): - """ - :param tw_vectors: See - ``allmydata.interfaces.TestAndWriteVectorsForShares``. - - :return set[int]: The share numbers which the given test/write vectors would write to. - """ - return set( - sharenum - for (sharenum, (test, data, new_length)) - in tw_vectors.items() - if data - ) - - -def get_allocated_size(tw_vectors): - """ - :param tw_vectors: See - ``allmydata.interfaces.TestAndWriteVectorsForShares``. - - :return int: The largest position ``tw_vectors`` writes in any share. - """ - return max( - list( - max(offset + len(s) for (offset, s) in data) - for (sharenum, (test, data, new_length)) - in tw_vectors.items() - if data - ), - ) - - def has_active_lease(storage_server, storage_index, now): """ :param allmydata.storage.server.StorageServer storage_server: A storage @@ -367,6 +362,41 @@ def check_pass_quantity_for_mutable_write(valid_count, tw_vectors): check_pass_quantity_for_write(valid_count, sharenums, allocated_size) +def get_slot_share_size(storage_server, storage_index, sharenums): + """ + Total the on-disk storage committed to the given shares in the given + storage index. + + :param allmydata.storage.server.StorageServer storage_server: The storage + server which owns the on-disk storage. + + :param bytes storage_index: The storage index to inspect. + + :param list[int] sharenums: The share numbers to consider. + + :return int: The number of bytes the given shares use on disk. Note this + is naive with respect to filesystem features like compression or + sparse files. It is just a total of the size reported by the + filesystem. + """ + total = 0 + bucket = join(storage_server.sharedir, storage_index_to_dir(storage_index)) + for candidate in listdir(bucket): + try: + sharenum = int(candidate) + except ValueError: + pass + else: + if sharenum in sharenums: + try: + metadata = stat(join(bucket, candidate)) + except Exception as e: + print(e) + else: + total += metadata.st_size + return total + + # I don't understand why this is required. # ZKAPAuthorizerStorageServer is-a Referenceable. It seems like # the built in adapter should take care of this case. diff --git a/src/_zkapauthorizer/storage_common.py b/src/_zkapauthorizer/storage_common.py index 5baacb22be69bb3e3473c3e7b8b22a7994320845..0bdee118a746f5d69ea59e8a4fe47b1826c12eb4 100644 --- a/src/_zkapauthorizer/storage_common.py +++ b/src/_zkapauthorizer/storage_common.py @@ -79,3 +79,57 @@ def has_writes(tw_vectors): for (test, data, new_length) in tw_vectors.values() ) + + +def get_sharenums(tw_vectors): + """ + :param tw_vectors: See + ``allmydata.interfaces.TestAndWriteVectorsForShares``. + + :return set[int]: The share numbers which the given test/write vectors would write to. + """ + return set( + sharenum + for (sharenum, (test, data, new_length)) + in tw_vectors.items() + if data + ) + + +def get_allocated_size(tw_vectors): + """ + :param tw_vectors: See + ``allmydata.interfaces.TestAndWriteVectorsForShares``. + + :return int: The largest position ``tw_vectors`` writes in any share. + """ + return max( + list( + max(offset + len(s) for (offset, s) in data) + for (sharenum, (test, data, new_length)) + in tw_vectors.items() + if data + ), + ) + + +def get_implied_data_length(data_vector, length): + """ + :param data_vector: See ``allmydata.interfaces.DataVector``. + + :param length: ``None`` or an overriding value for the length of the data. + This corresponds to the *new length* in + ``allmydata.interfaces.TestAndWriteVectorsForShares``. It may be + smaller than the result would be considering only ``data_vector`` if + there is a trunctation or larger if there is a zero-filled extension. + + :return int: The amount of data, in bytes, implied by a data vector and a + size. + """ + if length is None: + return max( + offset + len(data) + for (offset, data) + in data_vector + ) + return length diff --git a/src/_zkapauthorizer/tests/test_storage_server.py b/src/_zkapauthorizer/tests/test_storage_server.py index 82e7eb43acfa0dc40c0f15d502e02da712fa4610..055c43d11abcde2317112eb0b5af837974e0d5ff 100644 --- a/src/_zkapauthorizer/tests/test_storage_server.py +++ b/src/_zkapauthorizer/tests/test_storage_server.py @@ -57,6 +57,7 @@ from .strategies import ( write_enabler_secrets, lease_renew_secrets, lease_cancel_secrets, + test_and_write_vectors_for_shares, ) from .fixtures import ( AnonymousStorageServer, @@ -71,6 +72,12 @@ from ..api import ( from ..storage_common import ( BYTES_PER_PASS, allocate_buckets_message, + slot_testv_and_readv_and_writev_message, + required_passes, + get_sharenums, + get_allocated_size, + get_implied_data_length, + ) @@ -195,5 +202,93 @@ class PassValidationTests(TestCase): else: self.fail("expected MorePassesRequired, got {}".format(result)) - # TODO - # a write that increases the storage cost of the share requires passes too + @given( + storage_index=storage_indexes(), + secrets=tuples( + write_enabler_secrets(), + lease_renew_secrets(), + lease_cancel_secrets(), + ), + test_and_write_vectors_for_shares=test_and_write_vectors_for_shares(), + ) + def test_extend_mutable_fails_without_passes(self, storage_index, secrets, test_and_write_vectors_for_shares): + """ + If ``remote_slot_testv_and_readv_and_writev`` is invoked to increase the + storage used by shares without supplying passes, the operation fails + with ``MorePassesRequired``. + """ + # Hypothesis causes our storage server to be used many times. Clean + # up between iterations. + cleanup_storage_server(self.anonymous_storage_server) + + tw_vectors = { + k: v.for_call() + for (k, v) + in test_and_write_vectors_for_shares.items() + } + sharenums = get_sharenums(tw_vectors) + allocated_size = get_allocated_size(tw_vectors) + valid_passes = make_passes( + self.signing_key, + slot_testv_and_readv_and_writev_message(storage_index), + list( + RandomToken.create() + for i + in range(required_passes(BYTES_PER_PASS, sharenums, allocated_size)) + ), + ) + + # Create an initial share to toy with. + test, read = self.storage_server.doRemoteCall( + "slot_testv_and_readv_and_writev", + (), + dict( + passes=valid_passes, + storage_index=storage_index, + secrets=secrets, + tw_vectors=tw_vectors, + r_vector=[], + ), + ) + self.assertThat( + test, + Equals(True), + "Server denied initial write.", + ) + + # Try to grow one of the shares by BYTES_PER_PASS which should cost 1 + # pass. + sharenum = sorted(tw_vectors.keys())[0] + _, data_vector, new_length = tw_vectors[sharenum] + current_length = get_implied_data_length(data_vector, new_length) + + do_extend = lambda: self.storage_server.doRemoteCall( + "slot_testv_and_readv_and_writev", + (), + dict( + passes=[], + storage_index=storage_index, + secrets=secrets, + tw_vectors={ + sharenum: ( + [], + # Do it by writing past the end. Another thing we + # could do is just specify new_length with a large + # enough value. + [(current_length, b"x" * BYTES_PER_PASS)], + None, + ), + }, + r_vector=[], + ), + ) + + try: + result = do_extend() + except MorePassesRequired as e: + self.assertThat( + e.required_count, + Equals(1), + ) + else: + self.fail("expected MorePassesRequired, got {}".format(result))