Skip to content
Snippets Groups Projects
_storage_server.py 17.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2019 PrivateStorage.io, LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """
    A Tahoe-LAFS RIStorageServer-alike which authorizes writes and lease
    
    updates using per-call passes.
    
    
    This is the server part of a storage access protocol.  The client part is
    implemented in ``_storage_client.py``.
    
    from __future__ import (
        absolute_import,
    )
    
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    from struct import (
        unpack,
    )
    
    
    from os.path import (
        join,
    )
    from os import (
        listdir,
    )
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    from datetime import (
        timedelta,
    )
    
    from attr.validators import (
        provides,
    
    from zope.interface import (
    
    )
    from foolscap.api import (
    
        Referenceable,
    
    from foolscap.ipb import (
        IReferenceable,
        IRemotelyCallable,
    )
    
    from allmydata.interfaces import (
        RIStorageServer,
    )
    
    from allmydata.storage.common import (
        storage_index_to_dir,
    )
    
    from privacypass import (
        TokenPreimage,
        VerificationSignature,
        SigningKey,
    )
    
    
    from twisted.python.reflect import (
        namedAny,
    )
    from twisted.internet.interfaces import (
        IReactorTime,
    )
    
    
        RIPrivacyPassAuthorizedStorageServer,
    
    from .storage_common import (
    
        BYTES_PER_PASS,
        required_passes,
    
        allocate_buckets_message,
        add_lease_message,
        renew_lease_message,
        slot_testv_and_readv_and_writev_message,
    
        get_required_new_passes_for_mutable_write,
    
    # See allmydata/storage/mutable.py
    SLOT_HEADER_SIZE = 468
    LEASE_TRAILER_SIZE = 4
    
    
    class MorePassesRequired(Exception):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
        Storage operations fail with ``MorePassesRequired`` when they are not
        accompanied by a sufficient number of valid passes.
    
        :ivar int valid_count: The number of valid passes presented in the
            operation.
    
        ivar int required_count: The number of valid passes which must be
            presented for the operation to be authorized.
        """
    
        def __init__(self, valid_count, required_count):
            self.valid_count = valid_count
            self.required_count = required_count
    
    
        def __repr__(self):
            return "MorePassedRequired(valid_count={}, required_count={})".format(
                self.valid_count,
                self.required_count,
            )
    
        def __str__(self):
            return repr(self)
    
    
    class LeaseRenewalRequired(Exception):
        """
        Mutable write operations fail with ``LeaseRenewalRequired`` when the slot
        which is the target of the write does not have an active lease and no
        passes are supplied to create one.
        """
    
    
    
    @implementer_only(RIPrivacyPassAuthorizedStorageServer, IReferenceable, IRemotelyCallable)
    
    # It would be great to use `frozen=True` (value-based hashing) instead of
    # `cmp=False` (identity based hashing) but Referenceable wants to set some
    # attributes on self and it's hard to avoid that.
    @attr.s(cmp=False)
    
    class ZKAPAuthorizerStorageServer(Referenceable):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
    
        A class which wraps an ``RIStorageServer`` to insert pass validity checks
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        before allowing certain functionality.
        """
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
        # This is the amount of time an added or renewed lease will last.  We
        # duplicate the value used by the underlying anonymous-access storage
        # server which does not expose it via a Python API or allow it to be
        # configured or overridden.  It would be great if the anonymous-access
        # storage server eventually made lease time a parameter so we could just
        # control it ourselves.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        LEASE_PERIOD = timedelta(days=31)
    
    
        _original = attr.ib(validator=provides(RIStorageServer))
    
        _signing_key = attr.ib(validator=instance_of(SigningKey))
    
        _clock = attr.ib(
            validator=provides(IReactorTime),
            default=attr.Factory(partial(namedAny, "twisted.internet.reactor")),
        )
    
        def _is_invalid_pass(self, message, pass_):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            Cryptographically check the validity of a single pass.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
    
            :param unicode message: The shared message for pass validation.
            :param bytes pass_: The encoded pass to validate.
    
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            :return bool: ``False`` (invalid) if the pass includes a valid
                signature, ``True`` (valid) otherwise.
    
            """
            assert isinstance(message, unicode), "message %r not unicode" % (message,)
            assert isinstance(pass_, bytes), "pass %r not bytes" % (pass_,)
            try:
                preimage_base64, signature_base64 = pass_.split(b" ")
                preimage = TokenPreimage.decode_base64(preimage_base64)
                proposed_signature = VerificationSignature.decode_base64(signature_base64)
                unblinded_token = self._signing_key.rederive_unblinded_token(preimage)
                verification_key = unblinded_token.derive_verification_key_sha512()
                invalid_pass = verification_key.invalid_sha512(proposed_signature, message.encode("utf-8"))
                return invalid_pass
            except Exception:
                # It would be pretty nice to log something here, sometimes, I guess?
                return True
    
        def _validate_passes(self, message, passes):
            """
            Check all of the given passes for validity.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
    
            :param unicode message: The shared message for pass validation.
            :param list[bytes] passes: The encoded passes to validate.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
    
            :return list[bytes]: The passes which are found to be valid.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            result = list(
    
                pass_
                for pass_
                in passes
                if not self._is_invalid_pass(message, pass_)
            )
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            # print("{}: {} passes, {} valid".format(message, len(passes), len(result)))
            return result
    
        def remote_get_version(self):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Pass-through without pass check to allow clients to learn about our
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            version and configuration in case it helps them decide how to behave.
            """
    
            return self._original.remote_get_version()
    
    
        def remote_allocate_buckets(self, passes, storage_index, renew_secret, cancel_secret, sharenums, allocated_size, canary):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Pass-through after a pass check to ensure that clients can only allocate
            storage for immutable shares if they present valid passes.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            valid_passes = self._validate_passes(
                allocate_buckets_message(storage_index),
                passes,
            )
    
            check_pass_quantity_for_write(len(valid_passes), sharenums, allocated_size)
    
    
            return self._original.remote_allocate_buckets(
                storage_index,
                renew_secret,
                cancel_secret,
                sharenums,
                allocated_size,
                canary,
            )
    
    
        def remote_get_buckets(self, storage_index):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Pass-through without pass check to let clients read immutable shares as
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            long as those shares exist.
            """
    
            return self._original.remote_get_buckets(storage_index)
    
        def remote_add_lease(self, passes, storage_index, *a, **kw):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Pass-through after a pass check to ensure clients can only extend the
            duration of share storage if they present valid passes.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            # print("server add_lease({}, {!r})".format(len(passes), storage_index))
            valid_passes = self._validate_passes(add_lease_message(storage_index), passes)
            allocated_sizes = dict(
                get_share_sizes(
                    self._original, storage_index,
                    list(get_all_share_numbers(self._original, storage_index)),
                ),
            ).values()
            # print("allocated_sizes: {}".format(allocated_sizes))
            check_pass_quantity(len(valid_passes), allocated_sizes)
            # print("Checked out")
    
            return self._original.remote_add_lease(storage_index, *a, **kw)
    
        def remote_renew_lease(self, passes, storage_index, *a, **kw):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Pass-through after a pass check to ensure clients can only extend the
            duration of share storage if they present valid passes.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            self._validate_passes(renew_lease_message(storage_index), passes)
            return self._original.remote_renew_lease(storage_index, *a, **kw)
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
    
        def remote_advise_corrupt_share(self, *a, **kw):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Pass-through without a pass check to let clients inform us of possible
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            issues with the system without incurring any cost to themselves.
            """
    
            return self._original.remote_advise_corrupt_share(*a, **kw)
    
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        def remote_share_sizes(self, storage_index_or_slot, sharenums):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
                get_share_sizes(self._original, storage_index_or_slot, sharenums)
    
        def remote_slot_testv_and_readv_and_writev(
                self,
    
                storage_index,
                secrets,
                tw_vectors,
                r_vector,
        ):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Pass-through after a pass check to ensure clients can only allocate
            storage for mutable shares if they present valid passes.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
            :note: This method can be used both to allocate storage and to rewrite
                data in already-allocated storage.  These cases may not be the
    
                same from the perspective of pass validation.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            # Only writes to shares without an active lease will result in a lease
            # renewal.
    
            renew_leases = False
    
            if has_writes(tw_vectors):
    
                # Passes may be supplied with the write to create the
                # necessary lease as part of the same operation.  This must be
                # supported because there is no separate protocol action to
                # *create* a slot.  Clients just begin writing to it.
                valid_passes = self._validate_passes(
                    slot_testv_and_readv_and_writev_message(storage_index),
                    passes,
                )
                if has_active_lease(self._original, storage_index, self._clock.seconds()):
    
                    # Some of the storage is paid for already.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
                    current_sizes = dict(get_share_sizes(
    
                        self._original,
                        storage_index,
                        tw_vectors.keys(),
                    ))
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
                    # print("has writes, has active lease, current sizes: {}".format(current_sizes))
    
                    # None of it is.
                    current_sizes = {}
    
                required_new_passes = get_required_new_passes_for_mutable_write(
                    current_sizes,
                    tw_vectors,
                )
                if required_new_passes > len(valid_passes):
                    raise MorePassesRequired(len(valid_passes), required_new_passes)
    
    
            # Skip over the remotely exposed method and jump to the underlying
            # implementation which accepts one additional parameter that we know
            # about (and don't expose over the network): renew_leases.  We always
            # pass False for this because we want to manage leases completely
            # separately from writes.
    
            return self._original.slot_testv_and_readv_and_writev(
                storage_index,
                secrets,
                tw_vectors,
                r_vector,
    
    
        def remote_slot_readv(self, *a, **kw):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            """
    
            Pass-through without a pass check to let clients read mutable shares as
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
            long as those shares exist.
            """
    
            return self._original.remote_slot_readv(*a, **kw)
    
    
    
    def has_active_lease(storage_server, storage_index, now):
        """
        :param allmydata.storage.server.StorageServer storage_server: A storage
            server to use to look up lease information.
    
        :param bytes storage_index: A storage index to use to look up lease
            information.
    
        :param float now: The current time as a POSIX timestamp.
    
        :return bool: ``True`` if any only if the given storage index has a lease
            with an expiration time after ``now``.
        """
        leases = storage_server.get_slot_leases(storage_index)
        return any(
            lease.get_expiration_time() > now
            for lease
            in leases
        )
    
    
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    def check_pass_quantity(valid_count, share_sizes):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
        Check that the given number of passes is sufficient to cover leases for
        one period for shares of the given sizes.
    
        :param int valid_count: The number of passes.
        :param list[int] share_sizes: The sizes of the shares for which the lease
            is being created.
    
        :raise MorePassesRequired: If the given number of passes is too few for
            the given share sizes.
    
        :return: ``None`` if the given number of passes is sufficient.
        """
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        required_pass_count = required_passes(BYTES_PER_PASS, share_sizes)
        if valid_count < required_pass_count:
            raise MorePassesRequired(
                valid_count,
                required_pass_count,
            )
    
    
    def check_pass_quantity_for_write(valid_count, sharenums, allocated_size):
        """
        Determine if the given number of valid passes is sufficient for an
        attempted write.
    
        :param int valid_count: The number of valid passes to consider.
        :param set[int] sharenums: The shares being written to.
        :param int allocated_size: The size of each share.
    
        :raise MorePassedRequired: If the number of valid passes given is too
            small.
    
        :return: ``None`` if the number of valid passes given is sufficient.
        """
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        check_pass_quantity(valid_count, [allocated_size] * len(sharenums))
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    def get_all_share_paths(storage_server, storage_index):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
        Get the paths of all shares in the given storage index (or slot).
    
        :param allmydata.storage.server.StorageServer storage_server: The storage
            server which owns the storage index.
    
        :param bytes storage_index: The storage index (or slot) in which to look
            up shares.
    
        :return: A generator of tuples of (int, bytes) giving a share number and
            the path to storage for that share number.
        """
    
        bucket = join(storage_server.sharedir, storage_index_to_dir(storage_index))
    
        try:
            contents = listdir(bucket)
        except OSError as e:
            if e.errno == ENOENT:
                return
            raise
    
        for candidate in contents:
    
            try:
                sharenum = int(candidate)
            except ValueError:
                pass
            else:
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
                yield sharenum, join(bucket, candidate)
    
    
    def get_all_share_numbers(storage_server, storage_index):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
        Get all share numbers in the given storage index (or slot).
    
        :param allmydata.storage.server.StorageServer storage_server: The storage
            server which owns the storage index.
    
        :param bytes storage_index: The storage index (or slot) in which to look
            up share numbers.
    
        :return: A generator of int giving share numbers.
        """
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        for sharenum, sharepath in get_all_share_paths(storage_server, storage_index):
            yield sharenum
    
    
    def get_share_sizes(storage_server, storage_index_or_slot, sharenums):
        """
        Get the sizes of the given share numbers for the given storage index *or*
        slot.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
        :param allmydata.storage.server.StorageServer storage_server: The storage
            server which owns the storage index.
    
        :param bytes storage_index_or_slot: The storage index (or slot) in which
            to look up share numbers.
    
        :param sharenums: A container of share numbers to use to filter the
            results.  Only information about share numbers in this container is
            included in the result.
    
        :return: A generator of tuples of (int, int) where the first element is a
            share number and the second element is the data size for that share
            number.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
        get_size = None
        for sharenum, sharepath in get_all_share_paths(storage_server, storage_index_or_slot):
            if get_size is None:
                # Figure out if it is a storage index or a slot.
                with open(sharepath) as share_file:
                    magic = share_file.read(32)
                    if magic == "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e":
                        get_size = get_slot_share_size
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
                        get_size = get_storage_index_share_size
            yield sharenum, get_size(sharepath)
    
    
    def get_storage_index_share_size(sharepath):
        with open(sharepath) as share_file:
            share_data_length_bytes = share_file.read(8)[4:]
            (share_data_length,) = unpack('>L', share_data_length_bytes)
            return share_data_length
    
    
    def get_slot_share_size(sharepath):
        with open(sharepath) as share_file:
            share_data_length_bytes = share_file.read(92)[-8:]
            (share_data_length,) = unpack('>Q', share_data_length_bytes)
            return share_data_length
    
    # I don't understand why this is required.
    
    # ZKAPAuthorizerStorageServer is-a Referenceable.  It seems like
    
    # the built in adapter should take care of this case.
    from twisted.python.components import (
        registerAdapter,
    )
    from foolscap.referenceable import (
        ReferenceableSlicer,
    )
    from foolscap.ipb import (
        ISlicer,
    )
    
    registerAdapter(ReferenceableSlicer, ZKAPAuthorizerStorageServer, ISlicer)