diff --git a/src/_zkapauthorizer/_storage_server.py b/src/_zkapauthorizer/_storage_server.py index d208ce3155d366aea4458bd2c7c16f6b18296d14..8f883716a05dec68497317a914615c4e1e3d944c 100644 --- a/src/_zkapauthorizer/_storage_server.py +++ b/src/_zkapauthorizer/_storage_server.py @@ -35,12 +35,14 @@ from allmydata.interfaces import RIStorageServer from allmydata.storage.common import storage_index_to_dir from allmydata.storage.immutable import ShareFile from allmydata.storage.mutable import MutableShareFile +from allmydata.storage.lease import LeaseInfo from allmydata.storage.shares import get_share_file from allmydata.util.base32 import b2a from attr.validators import instance_of, provides from challenge_bypass_ristretto import SigningKey, TokenPreimage, VerificationSignature from eliot import start_action, log_call from foolscap.api import Referenceable +from twisted.python.filepath import FilePath from twisted.internet.defer import Deferred from twisted.internet.interfaces import IReactorTime from twisted.python.reflect import namedAny @@ -52,7 +54,6 @@ from .storage_common import ( add_lease_message, allocate_buckets_message, get_required_new_passes_for_mutable_write, - has_writes, pass_value_attribute, required_passes, slot_testv_and_readv_and_writev_message, @@ -305,12 +306,25 @@ class ZKAPAuthorizerStorageServer(Referenceable): r_vector, ): """ - Pass-through after a pass check to ensure clients can only allocate - storage for mutable shares if they present valid passes. + Perform a test-and-set on a number of shares in a given slot. - :note: This method can be used both to allocate storage and to rewrite - data in already-allocated storage. These cases may not be the - same from the perspective of pass validation. + Optionally, also read some data to be returned before writing any + changes. + + If storage-time will be allocated by the operation then validate the + given passes and ensure they are of sufficient quantity to pay for the + allocation. + + Specifically, passes are required in the following cases: + + * If shares are created then a lease is added to them. + Passes are required for the full size of the share. + + * If shares without unexpired leases are written then a lease is added to them. + Passes are required for the full size of the shares after the write. + + * If shares with unexpired leases are made larger. + Passes are required for the difference in price between the old and new size. """ with start_action( action_type=u"zkapauthorizer:storage-server:remote:slot-testv-and-readv-and-writev", @@ -336,58 +350,50 @@ class ZKAPAuthorizerStorageServer(Referenceable): tw_vectors, r_vector, ): - # Only writes to shares without an active lease will result in a lease - # renewal. - renew_leases = False - - if has_writes(tw_vectors): - # Passes may be supplied with the write to create the - # necessary lease as part of the same operation. This must be - # supported because there is no separate protocol action to - # *create* a slot. Clients just begin writing to it. - validation = _ValidationResult.validate_passes( - slot_testv_and_readv_and_writev_message(storage_index), - passes, - self._signing_key, - ) - if has_active_lease(self._original, storage_index, self._clock.seconds()): - # Some of the storage is paid for already. - current_sizes = dict( - get_share_sizes( - self._original, - storage_index, - # Consider the size of *all* shares even if they're - # not being written. If they have an unexpired lease - # then we can apply some or all of the remainder of - # the value of that lease towards this operation. - sharenums=None, - ) - ) - # print("has writes, has active lease, current sizes: {}".format(current_sizes)) - else: - # None of it is. - current_sizes = {} - renew_leases = True + # Get a stable time to use for all lease expiration checks that are + # part of this call. + now = self._clock.seconds() - required_new_passes = get_required_new_passes_for_mutable_write( - self._pass_value, - current_sizes, - tw_vectors, - ) - if required_new_passes > len(validation.valid): - validation.raise_for(required_new_passes) + # Check passes for cryptographic validity. + validation = _ValidationResult.validate_passes( + slot_testv_and_readv_and_writev_message(storage_index), + passes, + self._signing_key, + ) - # Skip over the remotely exposed method and jump to the underlying - # implementation which accepts one additional parameter that we know - # about (and don't expose over the network): renew_leases. - return self._original.slot_testv_and_readv_and_writev( + # Inspect the operation to determine its price based on any + # allocations. + required_new_passes = get_writev_price( + self._original, + self._pass_value, + storage_index, + tw_vectors, + now, + ) + + # Fail the operation right now if there aren't enough valid passes to + # cover the price. + if required_new_passes > len(validation.valid): + validation.raise_for(required_new_passes) + + # Perform the operation. + result = self._original.remote_slot_testv_and_readv_and_writev( storage_index, secrets, tw_vectors, r_vector, - renew_leases=renew_leases, + # Disable all lease renewal logic from the wrapped storage server. + # We'll add or renew leases based on our billing model. + renew_leases=False, ) + # Add the lease that we charged the client for: leases on all written + # shares without an unexpired lease. + add_leases_for_writev(self._original, storage_index, secrets, tw_vectors, now) + + # Propagate the result of the operation. + return result + def remote_slot_readv(self, *a, **kw): """ Pass-through without a pass check to let clients read mutable shares as @@ -697,3 +703,98 @@ def get_stat(sharepath): return stat_slot else: return stat_bucket + + +def add_leases_for_writev(storage_server, storage_index, secrets, tw_vectors, now): + """ + Add a new lease using the given secrets to all shares written by + ``tw_vectors``. + """ + for (sharenum, sharepath) in get_all_share_paths(storage_server, storage_index): + testv, datav, new_length = tw_vectors.get(sharenum, (None, b"", None)) + if datav or (new_length is not None): + # It has data or a new length - it is a write. + if share_has_active_leases(storage_server, storage_index, sharenum, now): + # It's fine, leave it be. + continue + + # Aha. It has no lease that hasn't expired. Give it one. + (write_enabler, renew_secret, cancel_secret) = secrets + share = get_share_file(sharepath) + share.add_or_renew_lease( + LeaseInfo( + owner_num=1, + renew_secret=renew_secret, + cancel_secret=cancel_secret, + expiration_time=now + ZKAPAuthorizerStorageServer.LEASE_PERIOD.total_seconds(), + nodeid=storage_server.my_nodeid, + ), + ) + +def get_share_path(storage_server, storage_index, sharenum): + # type: (StorageServer, bytes, int) -> FilePath + """ + Get the path to the given storage server's storage for the given share. + """ + return ( + FilePath(storage_server.sharedir) + .preauthChild(storage_index_to_dir(storage_index)) + .child(u"{}".format(sharenum)) + ) + + +def share_has_active_leases(storage_server, storage_index, sharenum, now): + # type: (StorageServer, bytes, int, float) -> bool + """ + Determine whether the given share on the given server has an unexpired + lease or not. + + :return: ``True`` if it has at least one unexpired lease, ``False`` + otherwise. + """ + sharepath = get_share_path(storage_server, storage_index, sharenum) + share = get_share_file(sharepath.path) + return any(lease.get_expiration_time() > now for lease in share.get_leases()) + + +def get_writev_price(storage_server, pass_value, storage_index, tw_vectors, now): + # type: (StorageServer, int, bytes, TestWriteVectors, float) -> int + """ + Determine the price to execute the given test/write vectors. + """ + # Find the current size of shares being written. + current_sizes = dict( + get_share_sizes( + storage_server, + storage_index, + # Consider the size of *all* shares even if they're not being + # written. If they have an unexpired lease then we can apply some + # or all of the remainder of the value of that lease towards this + # operation. + sharenums=None, + ), + ) + + # Zero out the size of any share without an unexpired lease. We will + # renew the lease on this share along with the write but the client + # must supply the necessary passes to do so. + current_sizes.update( + { + sharenum: 0 + for sharenum in current_sizes + if not share_has_active_leases( + storage_server, + storage_index, + sharenum, + now, + ) + } + ) + + # Compute the number of passes required to execute the given writev + # against these existing shares. + return get_required_new_passes_for_mutable_write( + pass_value, + current_sizes, + tw_vectors, + )