Skip to content
Snippets Groups Projects
_storage_server.py 29.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2019 PrivateStorage.io, LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """
    A Tahoe-LAFS RIStorageServer-alike which authorizes writes and lease
    
    updates using per-call passes.
    
    
    This is the server part of a storage access protocol.  The client part is
    implemented in ``_storage_client.py``.
    
    Tom Prince's avatar
    Tom Prince committed
    from __future__ import absolute_import
    
    Tom Prince's avatar
    Tom Prince committed
    from datetime import timedelta
    from errno import ENOENT
    from functools import partial
    from os import listdir, stat
    from os.path import join
    from struct import calcsize, unpack
    
    Tom Prince's avatar
    Tom Prince committed
    from allmydata.interfaces import RIStorageServer
    from allmydata.storage.common import storage_index_to_dir
    
    from allmydata.storage.immutable import ShareFile
    
    from allmydata.storage.lease import LeaseInfo
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    from allmydata.storage.mutable import MutableShareFile
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    from allmydata.storage.shares import get_share_file
    
    Tom Prince's avatar
    Tom Prince committed
    from allmydata.util.base32 import b2a
    from attr.validators import instance_of, provides
    from challenge_bypass_ristretto import SigningKey, TokenPreimage, VerificationSignature
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    from eliot import log_call, start_action
    
    Tom Prince's avatar
    Tom Prince committed
    from foolscap.api import Referenceable
    from twisted.internet.defer import Deferred
    from twisted.internet.interfaces import IReactorTime
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    from twisted.python.filepath import FilePath
    
    Tom Prince's avatar
    Tom Prince committed
    from twisted.python.reflect import namedAny
    
    from zope.interface import implementer
    
    Tom Prince's avatar
    Tom Prince committed
    
    from .foolscap import RIPrivacyPassAuthorizedStorageServer, ShareStat
    
    from .storage_common import (
    
    Tom Prince's avatar
    Tom Prince committed
        add_lease_message,
        allocate_buckets_message,
        get_required_new_passes_for_mutable_write,
    
        slot_testv_and_readv_and_writev_message,
    )
    
    # See allmydata/storage/mutable.py
    SLOT_HEADER_SIZE = 468
    LEASE_TRAILER_SIZE = 4
    
    
    Tom Prince's avatar
    Tom Prince committed
    
    
        The result of validating a list of passes.
    
        :ivar list[int] valid: A list of indexes (into the validated list) of which
            are acceptable.
    
        :ivar list[int] signature_check_failed: A list of indexes (into the
            validated list) of passes which did not have a correct signature.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
    
    Tom Prince's avatar
    Tom Prince committed
    
    
        valid = attr.ib()
        signature_check_failed = attr.ib()
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
    
        @classmethod
        def _is_invalid_pass(cls, message, pass_, signing_key):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            Cryptographically check the validity of a single pass.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
    
            :param unicode message: The shared message for pass validation.
            :param bytes pass_: The encoded pass to validate.
    
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            :return bool: ``False`` (invalid) if the pass includes a valid
                signature, ``True`` (valid) otherwise.
    
            """
            assert isinstance(message, unicode), "message %r not unicode" % (message,)
            assert isinstance(pass_, bytes), "pass %r not bytes" % (pass_,)
            try:
                preimage_base64, signature_base64 = pass_.split(b" ")
                preimage = TokenPreimage.decode_base64(preimage_base64)
                proposed_signature = VerificationSignature.decode_base64(signature_base64)
    
                unblinded_token = signing_key.rederive_unblinded_token(preimage)
    
                verification_key = unblinded_token.derive_verification_key_sha512()
    
    Tom Prince's avatar
    Tom Prince committed
                invalid_pass = verification_key.invalid_sha512(
                    proposed_signature, message.encode("utf-8")
                )
    
                return invalid_pass
            except Exception:
                # It would be pretty nice to log something here, sometimes, I guess?
                return True
    
    
        @classmethod
        def validate_passes(cls, message, passes, signing_key):
    
            """
            Check all of the given passes for validity.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
    
            :param unicode message: The shared message for pass validation.
            :param list[bytes] passes: The encoded passes to validate.
    
            :param SigningKey signing_key: The signing key to use to check the passes.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
    
            :return: An instance of this class describing the validation result
                for all passes given.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            valid = []
            signature_check_failed = []
            for idx, pass_ in enumerate(passes):
                if cls._is_invalid_pass(message, pass_, signing_key):
                    signature_check_failed.append(idx)
                else:
                    valid.append(idx)
            return cls(
                valid=valid,
                signature_check_failed=signature_check_failed,
    
    
        def raise_for(self, required_pass_count):
            """
            :raise MorePassesRequired: Always raised with fields populated from this
                instance and the given ``required_pass_count``.
            """
            raise MorePassesRequired(
                len(self.valid),
                required_pass_count,
                self.signature_check_failed,
            )
    
    
    class LeaseRenewalRequired(Exception):
        """
        Mutable write operations fail with ``LeaseRenewalRequired`` when the slot
        which is the target of the write does not have an active lease and no
        passes are supplied to create one.
        """
    
    
    
    @implementer(RIPrivacyPassAuthorizedStorageServer)
    
    # It would be great to use `frozen=True` (value-based hashing) instead of
    # `cmp=False` (identity based hashing) but Referenceable wants to set some
    # attributes on self and it's hard to avoid that.
    @attr.s(cmp=False)
    class ZKAPAuthorizerStorageServer(Referenceable):
        """
        A class which wraps an ``RIStorageServer`` to insert pass validity checks
        before allowing certain functionality.
        """
    
        # This is the amount of time an added or renewed lease will last.  We
        # duplicate the value used by the underlying anonymous-access storage
        # server which does not expose it via a Python API or allow it to be
        # configured or overridden.  It would be great if the anonymous-access
        # storage server eventually made lease time a parameter so we could just
        # control it ourselves.
        LEASE_PERIOD = timedelta(days=31)
    
        _original = attr.ib(validator=provides(RIStorageServer))
        _pass_value = pass_value_attribute()
        _signing_key = attr.ib(validator=instance_of(SigningKey))
        _clock = attr.ib(
            validator=provides(IReactorTime),
            default=attr.Factory(partial(namedAny, "twisted.internet.reactor")),
        )
    
        def remote_get_version(self):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Pass-through without pass check to allow clients to learn about our
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            version and configuration in case it helps them decide how to behave.
            """
    
            return self._original.remote_get_version()
    
    
    Tom Prince's avatar
    Tom Prince committed
        def remote_allocate_buckets(
            self,
            passes,
            storage_index,
            renew_secret,
            cancel_secret,
            sharenums,
            allocated_size,
            canary,
        ):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Pass-through after a pass check to ensure that clients can only allocate
            storage for immutable shares if they present valid passes.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            validation = _ValidationResult.validate_passes(
    
                allocate_buckets_message(storage_index),
                passes,
    
    
            # Note: The *allocate_buckets* protocol allows for some shares to
            # already exist on the server.  When this is the case, the cost of the
    
            # operation is based only on the shares which are really allocated
    
            # here.  It's not clear if we can allow the client to supply the
            # reduced number of passes in the call but we can be sure to only mark
            # as spent enough passes to cover the allocated buckets.  The return
            # value of the method will tell the client what the true cost was and
            # they can update their books in the same way.
            #
            # "Spending" isn't implemented yet so there is no code here to deal
            # with this fact (though the client does do the necessary bookkeeping
            # already).  See
            # https://github.com/PrivateStorageio/ZKAPAuthorizer/issues/41.
            #
            # Note: The downside of this scheme is that the client has revealed
            # some tokens to us.  If we act in bad faith we can use this
            # information to correlate this operation with a future operation
            # where they are re-spent.  We don't do this but it would be better if
            # we fixed the protocol so it's not even possible.  Probably should
            # file a ticket for this.
    
            check_pass_quantity_for_write(
                self._pass_value,
    
            alreadygot, bucketwriters = self._original._allocate_buckets(
    
                storage_index,
                renew_secret,
                cancel_secret,
                sharenums,
                allocated_size,
    
            # Copy/paste the disconnection handling logic from
            # StorageServer.remote_allocate_buckets.
            for bw in bucketwriters.values():
                disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
                self._original._bucket_writer_disconnect_markers[bw] = (
                    canary,
                    disconnect_marker,
                )
            return alreadygot, bucketwriters
    
    
        def remote_get_buckets(self, storage_index):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Pass-through without pass check to let clients read immutable shares as
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            long as those shares exist.
            """
    
            return self._original.remote_get_buckets(storage_index)
    
        def remote_add_lease(self, passes, storage_index, *a, **kw):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Pass-through after a pass check to ensure clients can only extend the
            duration of share storage if they present valid passes.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            validation = _ValidationResult.validate_passes(
                add_lease_message(storage_index),
                passes,
                self._signing_key,
            )
    
            return self._original.remote_add_lease(storage_index, *a, **kw)
    
        def remote_advise_corrupt_share(self, *a, **kw):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Pass-through without a pass check to let clients inform us of possible
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            issues with the system without incurring any cost to themselves.
            """
    
            return self._original.remote_advise_corrupt_share(*a, **kw)
    
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        def remote_share_sizes(self, storage_index_or_slot, sharenums):
    
            with start_action(
    
    Tom Prince's avatar
    Tom Prince committed
                action_type=u"zkapauthorizer:storage-server:remote:share-sizes",
                storage_index_or_slot=storage_index_or_slot,
    
            ):
                return dict(
                    get_share_sizes(self._original, storage_index_or_slot, sharenums)
                )
    
        def remote_stat_shares(self, storage_indexes_or_slots):
    
            # type: (List[bytes]) -> List[Dict[int, ShareStat]]
    
                dict(get_share_stats(self._original, storage_index_or_slot, None))
    
    Tom Prince's avatar
    Tom Prince committed
                for storage_index_or_slot in storage_indexes_or_slots
    
        def remote_slot_testv_and_readv_and_writev(
    
    Tom Prince's avatar
    Tom Prince committed
            self,
            passes,
            storage_index,
            secrets,
            tw_vectors,
            r_vector,
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Perform a test-and-set on a number of shares in a given slot.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
    
            Optionally, also read some data to be returned before writing any
            changes.
    
            If storage-time will be allocated by the operation then validate the
            given passes and ensure they are of sufficient quantity to pay for the
            allocation.
    
            Specifically, passes are required in the following cases:
    
            * If shares are created then a lease is added to them.
              Passes are required for the full size of the share.
    
            * If shares without unexpired leases are written then a lease is added to them.
              Passes are required for the full size of the shares after the write.
    
            * If shares with unexpired leases are made larger.
              Passes are required for the difference in price between the old and new size.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            with start_action(
    
    Tom Prince's avatar
    Tom Prince committed
                action_type=u"zkapauthorizer:storage-server:remote:slot-testv-and-readv-and-writev",
                storage_index=b2a(storage_index),
                path=storage_index_to_dir(storage_index),
    
            ):
                result = self._slot_testv_and_readv_and_writev(
                    passes,
                    storage_index,
                    secrets,
                    tw_vectors,
                    r_vector,
                )
                if isinstance(result, Deferred):
                    raise TypeError("_slot_testv_and_readv_and_writev returned Deferred")
                return result
    
        def _slot_testv_and_readv_and_writev(
    
    Tom Prince's avatar
    Tom Prince committed
            self,
            passes,
            storage_index,
            secrets,
            tw_vectors,
            r_vector,
    
            # Get a stable time to use for all lease expiration checks that are
            # part of this call.
            now = self._clock.seconds()
    
            # Check passes for cryptographic validity.
            validation = _ValidationResult.validate_passes(
                slot_testv_and_readv_and_writev_message(storage_index),
                passes,
                self._signing_key,
            )
    
            # Inspect the operation to determine its price based on any
            # allocations.
            required_new_passes = get_writev_price(
                self._original,
                self._pass_value,
                storage_index,
                tw_vectors,
                now,
            )
    
            # Fail the operation right now if there aren't enough valid passes to
            # cover the price.
            if required_new_passes > len(validation.valid):
                validation.raise_for(required_new_passes)
    
            # Perform the operation.
            result = self._original.remote_slot_testv_and_readv_and_writev(
    
                storage_index,
                secrets,
                tw_vectors,
                r_vector,
    
                # Disable all lease renewal logic from the wrapped storage server.
                # We'll add or renew leases based on our billing model.
                renew_leases=False,
    
            # Add the leases that we charged the client for.  This includes:
            #
            #  - leases on newly created shares
            #
            #  - leases on existing, modified shares without an unexpired lease
            #
            # Note it does not include existing shares that grew enough to be more
            # expensive.  The operation was required to pay the full price
            # difference but this only grants storage for the remainder of the
            # existing lease period.  This results in the client being overcharged
            # somewhat.
    
            add_leases_for_writev(self._original, storage_index, secrets, tw_vectors, now)
    
            # Propagate the result of the operation.
            return result
    
    
        def remote_slot_readv(self, *a, **kw):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Pass-through without a pass check to let clients read mutable shares as
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            long as those shares exist.
            """
    
            return self._original.remote_slot_readv(*a, **kw)
    
    
    
    def has_active_lease(storage_server, storage_index, now):
        """
        :param allmydata.storage.server.StorageServer storage_server: A storage
            server to use to look up lease information.
    
        :param bytes storage_index: A storage index to use to look up lease
            information.
    
        :param float now: The current time as a POSIX timestamp.
    
        :return bool: ``True`` if any only if the given storage index has a lease
            with an expiration time after ``now``.
        """
        leases = storage_server.get_slot_leases(storage_index)
    
    Tom Prince's avatar
    Tom Prince committed
        return any(lease.get_expiration_time() > now for lease in leases)
    
    def check_pass_quantity(pass_value, validation, share_sizes):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
        Check that the given number of passes is sufficient to cover leases for
        one period for shares of the given sizes.
    
    
        :param int pass_value: The value of a single pass in bytes × lease periods.
    
        :param _ValidationResult validation: The validating results for a list of passes.
    
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        :param list[int] share_sizes: The sizes of the shares for which the lease
            is being created.
    
        :raise MorePassesRequired: If the given number of passes is too few for
            the given share sizes.
    
        :return: ``None`` if the given number of passes is sufficient.
        """
    
        required_pass_count = required_passes(pass_value, share_sizes)
    
        if len(validation.valid) < required_pass_count:
            validation.raise_for(required_pass_count)
    
    Tom Prince's avatar
    Tom Prince committed
    def check_pass_quantity_for_lease(
        pass_value, storage_index, validation, storage_server
    ):
    
        """
        Check that the given number of passes is sufficient to add or renew a
        lease for one period for the given storage index.
    
    
        :param int pass_value: The value of a single pass in bytes × lease periods.
    
        :param _ValidationResult validation: The validating results for a list of passes.
    
        :raise MorePassesRequired: If the given number of passes is too few for
            the share sizes at the given storage index.
    
        :return: ``None`` if the given number of passes is sufficient.
    
        """
        allocated_sizes = dict(
            get_share_sizes(
                storage_server,
                storage_index,
                list(get_all_share_numbers(storage_server, storage_index)),
            ),
        ).values()
    
        check_pass_quantity(pass_value, validation, allocated_sizes)
    
    def check_pass_quantity_for_write(pass_value, validation, sharenums, allocated_size):
    
        """
        Determine if the given number of valid passes is sufficient for an
        attempted write.
    
    
        :param int pass_value: The value of a single pass in bytes × lease periods.
    
        :param _ValidationResult validation: The validating results for a list of passes.
    
    
        :param set[int] sharenums: The shares being written to.
    
        :param int allocated_size: The size of each share.
    
        :raise MorePassedRequired: If the number of valid passes given is too
            small.
    
        :return: ``None`` if the number of valid passes given is sufficient.
        """
    
        check_pass_quantity(pass_value, validation, [allocated_size] * len(sharenums))
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    def get_all_share_paths(storage_server, storage_index):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
        Get the paths of all shares in the given storage index (or slot).
    
        :param allmydata.storage.server.StorageServer storage_server: The storage
            server which owns the storage index.
    
        :param bytes storage_index: The storage index (or slot) in which to look
            up shares.
    
        :return: A generator of tuples of (int, bytes) giving a share number and
            the path to storage for that share number.
        """
    
        bucket = join(storage_server.sharedir, storage_index_to_dir(storage_index))
    
        try:
            contents = listdir(bucket)
        except OSError as e:
            if e.errno == ENOENT:
                return
            raise
    
        for candidate in contents:
    
            try:
                sharenum = int(candidate)
            except ValueError:
                pass
            else:
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
                yield sharenum, join(bucket, candidate)
    
    
    def get_all_share_numbers(storage_server, storage_index):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
        Get all share numbers in the given storage index (or slot).
    
        :param allmydata.storage.server.StorageServer storage_server: The storage
            server which owns the storage index.
    
        :param bytes storage_index: The storage index (or slot) in which to look
            up share numbers.
    
        :return: A generator of int giving share numbers.
        """
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        for sharenum, sharepath in get_all_share_paths(storage_server, storage_index):
            yield sharenum
    
    
    
    @log_call(
        action_type="zkapauthorizer:storage-server:get-share-sizes",
        include_args=["storage_index_or_slot", "sharenums"],
    )
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    def get_share_sizes(storage_server, storage_index_or_slot, sharenums):
        """
    
        Get sizes of the given share numbers for the given storage index *or*
        slot.
    
        :see: ``get_share_stats``
    
    
        :return: A list of tuples of (int, int) where the first element is a share
            number and the second element is the data size for that share number.
    
            (sharenum, stat.size)
    
    Tom Prince's avatar
    Tom Prince committed
            for (sharenum, stat) in get_share_stats(
                storage_server, storage_index_or_slot, sharenums
            )
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
    
    def get_share_stats(storage_server, storage_index_or_slot, sharenums):
        """
        Get the stats for the given share numbers for the given storage index *or*
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        slot.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
        :param allmydata.storage.server.StorageServer storage_server: The storage
            server which owns the storage index.
    
        :param bytes storage_index_or_slot: The storage index (or slot) in which
            to look up share numbers.
    
        :param sharenums: A container of share numbers to use to filter the
            results.  Only information about share numbers in this container is
    
            included in the result.  Or, ``None`` to get sizes for all shares
            which exist.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
    
        :return: A generator of tuples of (int, ShareStat) where the first element
            is a share number and the second element gives stats about that share.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
    
    Tom Prince's avatar
    Tom Prince committed
        for sharenum, sharepath in get_all_share_paths(
            storage_server, storage_index_or_slot
        ):
    
            if stat is None:
                stat = get_stat(sharepath)
    
            if sharenums is None or sharenum in sharenums:
    
                info = stat(storage_server, storage_index_or_slot, sharepath)
                yield sharenum, info
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
    
    def get_storage_index_share_size(sharepath):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
        Get the size of a share belonging to a storage index (an immutable share).
    
        :param bytes sharepath: The path to the share file.
    
        :return int: The data size of the share in bytes.
        """
    
        # From src/allmydata/storage/immutable.py
        #
        # The share file has the following layout:
        #  0x00: share file version number, four bytes, current version is 1
        #  0x04: share data length, four bytes big-endian = A # See Footnote 1 below.
        #  0x08: number of leases, four bytes big-endian
        #  0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
        #  A+0x0c = B: first lease. Lease format is:
        #   B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
        #   B+0x04: renew secret, 32 bytes (SHA256)
        #   B+0x24: cancel secret, 32 bytes (SHA256)
        #   B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
        #   B+0x48: next lease, or end of record
        #
        # Footnote 1: as of Tahoe v1.3.0 this field is not used by storage
        # servers, but it is still filled in by storage servers in case the
        # storage server software gets downgraded from >= Tahoe v1.3.0 to < Tahoe
        # v1.3.0, or the share file is moved from one storage server to
        # another. The value stored in this field is truncated, so if the actual
        # share data length is >= 2**32, then the value stored in this field will
        # be the actual share data length modulo 2**32.
    
        share_file_size = stat(sharepath).st_size
        header_format = ">LLL"
    
        header_size = calcsize(header_format)
    
        with open(sharepath, "rb") as share_file:
    
            header = share_file.read(calcsize(header_format))
    
        if len(header) != header_size:
    
                "Tried to read {} bytes of share file header, got {!r} instead.".format(
    
                    calcsize(header_format),
    
        version, _, number_of_leases = unpack(header_format, header)
    
        if version != 1:
            raise ValueError(
                "Cannot interpret version {} share file.".format(version),
            )
    
    
        return share_file_size - header_size - (number_of_leases * (4 + 32 + 32 + 4))
    
    def stat_bucket(storage_server, storage_index, sharepath):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
        Get a ``ShareStat`` for the shares in a bucket.
        """
    
        return ShareStat(
            size=get_storage_index_share_size(sharepath),
    
            lease_expiration=get_lease_expiration(sharepath),
    
        )
    
    
    def stat_slot(storage_server, slot, sharepath):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
        Get a ``ShareStat`` for the shares in a slot.
        """
    
        return ShareStat(
            size=get_slot_share_size(sharepath),
    
            lease_expiration=get_lease_expiration(sharepath),
    
    def get_lease_expiration(sharepath):
        # type: (str) -> Optional[int]
        """
        Get the latest lease expiration time for the share at the given path, or
        ``None`` if there are no leases on it.
    
        :param sharepath: The path to the share file to inspect.
        """
        leases = list(
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            lease.get_expiration_time() for lease in get_share_file(sharepath).get_leases()
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    def get_slot_share_size(sharepath):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
        Get the size of a share belonging to a slot (a mutable share).
    
        :param bytes sharepath: The path to the share file.
    
        :return int: The data size of the share in bytes.
        """
    
        with open(sharepath, "rb") as share_file:
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            share_data_length_bytes = share_file.read(92)[-8:]
    
    Tom Prince's avatar
    Tom Prince committed
            (share_data_length,) = unpack(">Q", share_data_length_bytes)
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            return share_data_length
    
    def get_stat(sharepath):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
        Get a function that can retrieve the metadata from the share at the given
        path.
    
        This is necessary to differentiate between buckets and slots.
        """
    
        # Figure out if it is a storage index or a slot.
    
        with open(sharepath, "rb") as share_file:
    
            magic = share_file.read(32)
            if magic == "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e":
                return stat_slot
            else:
                return stat_bucket
    
    
    
    def add_leases_for_writev(storage_server, storage_index, secrets, tw_vectors, now):
        """
        Add a new lease using the given secrets to all shares written by
        ``tw_vectors``.
        """
        for (sharenum, sharepath) in get_all_share_paths(storage_server, storage_index):
            testv, datav, new_length = tw_vectors.get(sharenum, (None, b"", None))
            if datav or (new_length is not None):
                # It has data or a new length - it is a write.
                if share_has_active_leases(storage_server, storage_index, sharenum, now):
                    # It's fine, leave it be.
                    continue
    
                # Aha.  It has no lease that hasn't expired.  Give it one.
                (write_enabler, renew_secret, cancel_secret) = secrets
                share = get_share_file(sharepath)
                share.add_or_renew_lease(
                    LeaseInfo(
                        owner_num=1,
                        renew_secret=renew_secret,
                        cancel_secret=cancel_secret,
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
                        expiration_time=now
                        + ZKAPAuthorizerStorageServer.LEASE_PERIOD.total_seconds(),
    
                        nodeid=storage_server.my_nodeid,
                    ),
                )
    
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
    
    def get_share_path(storage_server, storage_index, sharenum):
        # type: (StorageServer, bytes, int) -> FilePath
        """
        Get the path to the given storage server's storage for the given share.
        """
        return (
            FilePath(storage_server.sharedir)
            .preauthChild(storage_index_to_dir(storage_index))
            .child(u"{}".format(sharenum))
        )
    
    
    def share_has_active_leases(storage_server, storage_index, sharenum, now):
        # type: (StorageServer, bytes, int, float) -> bool
        """
        Determine whether the given share on the given server has an unexpired
        lease or not.
    
        :return: ``True`` if it has at least one unexpired lease, ``False``
            otherwise.
        """
        sharepath = get_share_path(storage_server, storage_index, sharenum)
        share = get_share_file(sharepath.path)
        return any(lease.get_expiration_time() > now for lease in share.get_leases())
    
    
    def get_writev_price(storage_server, pass_value, storage_index, tw_vectors, now):
        # type: (StorageServer, int, bytes, TestWriteVectors, float) -> int
        """
        Determine the price to execute the given test/write vectors.
        """
        # Find the current size of shares being written.
        current_sizes = dict(
            get_share_sizes(
                storage_server,
                storage_index,
                # Consider the size of *all* shares even if they're not being
                # written.  If they have an unexpired lease then we can apply some
                # or all of the remainder of the value of that lease towards this
                # operation.
    
                #
                # For example, if there are 5 0.5 MB shares already present then 5
                # ZKAPs (assuming a ZKAP value of 1 MB-month) have been spent to
                # store them while only 2.5 MB of storage has been allocated.
                # This means we could accept another 2.5 MB of data for storage at
                # no cost to the client while still having been completely paid
                # for the storage committed.
                #
                # This is sort of a weird trick to try to reduce the impact of
                # ZKAP value lost to the rounding scheme in use.  There's a good
                # chance there are better solutions waiting to be discovered that
                # would obsolete this logic.  Note that whatever the logic here,
                # it must agree with the client-side logic so that both server and
                # client come to the same conclusion regarding price.
    
                sharenums=None,
            ),
        )
    
        # Zero out the size of any share without an unexpired lease.  We will
        # renew the lease on this share along with the write but the client
        # must supply the necessary passes to do so.
        current_sizes.update(
            {
                sharenum: 0
                for sharenum in current_sizes
                if not share_has_active_leases(
                    storage_server,
                    storage_index,
                    sharenum,
                    now,
                )
            }
        )
    
        # Compute the number of passes required to execute the given writev
        # against these existing shares.
        return get_required_new_passes_for_mutable_write(
            pass_value,
            current_sizes,
            tw_vectors,
        )