Newer
Older
# Copyright 2019 PrivateStorage.io, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
A Tahoe-LAFS RIStorageServer-alike which authorizes writes and lease
This is the server part of a storage access protocol. The client part is
implemented in ``_storage_client.py``.
from __future__ import (
absolute_import,
)
from errno import (
ENOENT,
)
from functools import (
partial,
)
Jean-Paul Calderone
committed
from os.path import (
join,
)
from os import (
listdir,
)
import attr
from attr.validators import (
provides,
instance_of,
from foolscap.ipb import (
IReferenceable,
IRemotelyCallable,
)
from allmydata.interfaces import (
RIStorageServer,
)
Jean-Paul Calderone
committed
from allmydata.storage.common import (
storage_index_to_dir,
)
from privacypass import (
TokenPreimage,
VerificationSignature,
SigningKey,
)
from twisted.python.reflect import (
namedAny,
)
from twisted.internet.interfaces import (
IReactorTime,
)
from .foolscap import (
from .storage_common import (
BYTES_PER_PASS,
required_passes,
allocate_buckets_message,
add_lease_message,
renew_lease_message,
slot_testv_and_readv_and_writev_message,
get_required_new_passes_for_mutable_write,
# See allmydata/storage/mutable.py
SLOT_HEADER_SIZE = 468
LEASE_TRAILER_SIZE = 4
class MorePassesRequired(Exception):
"""
Storage operations fail with ``MorePassesRequired`` when they are not
accompanied by a sufficient number of valid passes.
:ivar int valid_count: The number of valid passes presented in the
operation.
ivar int required_count: The number of valid passes which must be
presented for the operation to be authorized.
"""
def __init__(self, valid_count, required_count):
self.valid_count = valid_count
self.required_count = required_count
def __repr__(self):
return "MorePassedRequired(valid_count={}, required_count={})".format(
self.valid_count,
self.required_count,
)
def __str__(self):
return repr(self)
class LeaseRenewalRequired(Exception):
"""
Mutable write operations fail with ``LeaseRenewalRequired`` when the slot
which is the target of the write does not have an active lease and no
passes are supplied to create one.
"""
@implementer_only(RIPrivacyPassAuthorizedStorageServer, IReferenceable, IRemotelyCallable)
# It would be great to use `frozen=True` (value-based hashing) instead of
# `cmp=False` (identity based hashing) but Referenceable wants to set some
# attributes on self and it's hard to avoid that.
@attr.s(cmp=False)
class ZKAPAuthorizerStorageServer(Referenceable):
A class which wraps an ``RIStorageServer`` to insert pass validity checks
# This is the amount of time an added or renewed lease will last. We
# duplicate the value used by the underlying anonymous-access storage
# server which does not expose it via a Python API or allow it to be
# configured or overridden. It would be great if the anonymous-access
# storage server eventually made lease time a parameter so we could just
# control it ourselves.
_original = attr.ib(validator=provides(RIStorageServer))
_signing_key = attr.ib(validator=instance_of(SigningKey))
_clock = attr.ib(
validator=provides(IReactorTime),
default=attr.Factory(partial(namedAny, "twisted.internet.reactor")),
)
def _is_invalid_pass(self, message, pass_):
:param unicode message: The shared message for pass validation.
:param bytes pass_: The encoded pass to validate.
:return bool: ``False`` (invalid) if the pass includes a valid
signature, ``True`` (valid) otherwise.
"""
assert isinstance(message, unicode), "message %r not unicode" % (message,)
assert isinstance(pass_, bytes), "pass %r not bytes" % (pass_,)
try:
preimage_base64, signature_base64 = pass_.split(b" ")
preimage = TokenPreimage.decode_base64(preimage_base64)
proposed_signature = VerificationSignature.decode_base64(signature_base64)
unblinded_token = self._signing_key.rederive_unblinded_token(preimage)
verification_key = unblinded_token.derive_verification_key_sha512()
invalid_pass = verification_key.invalid_sha512(proposed_signature, message.encode("utf-8"))
return invalid_pass
except Exception:
# It would be pretty nice to log something here, sometimes, I guess?
return True
def _validate_passes(self, message, passes):
"""
Check all of the given passes for validity.
:param unicode message: The shared message for pass validation.
:param list[bytes] passes: The encoded passes to validate.
:return list[bytes]: The passes which are found to be valid.
pass_
for pass_
in passes
if not self._is_invalid_pass(message, pass_)
)
# print("{}: {} passes, {} valid".format(message, len(passes), len(result)))
return result
Pass-through without pass check to allow clients to learn about our
version and configuration in case it helps them decide how to behave.
"""
return self._original.remote_get_version()
def remote_allocate_buckets(self, passes, storage_index, renew_secret, cancel_secret, sharenums, allocated_size, canary):
Pass-through after a pass check to ensure that clients can only allocate
storage for immutable shares if they present valid passes.
valid_passes = self._validate_passes(
allocate_buckets_message(storage_index),
passes,
)
check_pass_quantity_for_write(len(valid_passes), sharenums, allocated_size)
return self._original.remote_allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums,
allocated_size,
canary,
)
def remote_get_buckets(self, storage_index):
Pass-through without pass check to let clients read immutable shares as
return self._original.remote_get_buckets(storage_index)
def remote_add_lease(self, passes, storage_index, *a, **kw):
Pass-through after a pass check to ensure clients can only extend the
duration of share storage if they present valid passes.
# print("server add_lease({}, {!r})".format(len(passes), storage_index))
valid_passes = self._validate_passes(add_lease_message(storage_index), passes)
check_pass_quantity_for_lease(
storage_index,
valid_passes,
self._original,
)
return self._original.remote_add_lease(storage_index, *a, **kw)
def remote_renew_lease(self, passes, storage_index, *a, **kw):
Pass-through after a pass check to ensure clients can only extend the
duration of share storage if they present valid passes.
valid_passes = self._validate_passes(renew_lease_message(storage_index), passes)
check_pass_quantity_for_lease(
storage_index,
valid_passes,
self._original,
)
return self._original.remote_renew_lease(storage_index, *a, **kw)
def remote_advise_corrupt_share(self, *a, **kw):
Pass-through without a pass check to let clients inform us of possible
issues with the system without incurring any cost to themselves.
"""
return self._original.remote_advise_corrupt_share(*a, **kw)
def remote_share_sizes(self, storage_index_or_slot, sharenums):
get_share_sizes(self._original, storage_index_or_slot, sharenums)
def remote_stat_shares(self, storage_indexes_or_slots):
return list(
dict(stat_share(self._original, storage_index_or_slot))
for storage_index_or_slot
in storage_indexes_or_slots
)
def remote_slot_testv_and_readv_and_writev(
self,
storage_index,
secrets,
tw_vectors,
r_vector,
):
Pass-through after a pass check to ensure clients can only allocate
storage for mutable shares if they present valid passes.
:note: This method can be used both to allocate storage and to rewrite
data in already-allocated storage. These cases may not be the
same from the perspective of pass validation.
# Only writes to shares without an active lease will result in a lease
# renewal.
renew_leases = False
if has_writes(tw_vectors):
Jean-Paul Calderone
committed
# Passes may be supplied with the write to create the
# necessary lease as part of the same operation. This must be
# supported because there is no separate protocol action to
# *create* a slot. Clients just begin writing to it.
valid_passes = self._validate_passes(
slot_testv_and_readv_and_writev_message(storage_index),
passes,
)
if has_active_lease(self._original, storage_index, self._clock.seconds()):
# Some of the storage is paid for already.
self._original,
storage_index,
tw_vectors.keys(),
))
# print("has writes, has active lease, current sizes: {}".format(current_sizes))
Jean-Paul Calderone
committed
else:
# None of it is.
current_sizes = {}
renew_leases = True
required_new_passes = get_required_new_passes_for_mutable_write(
current_sizes,
tw_vectors,
)
if required_new_passes > len(valid_passes):
raise MorePassesRequired(len(valid_passes), required_new_passes)
# Skip over the remotely exposed method and jump to the underlying
# implementation which accepts one additional parameter that we know
# about (and don't expose over the network): renew_leases. We always
# pass False for this because we want to manage leases completely
# separately from writes.
return self._original.slot_testv_and_readv_and_writev(
storage_index,
secrets,
tw_vectors,
r_vector,
renew_leases=renew_leases,
def remote_slot_readv(self, *a, **kw):
Pass-through without a pass check to let clients read mutable shares as
return self._original.remote_slot_readv(*a, **kw)
def has_active_lease(storage_server, storage_index, now):
"""
:param allmydata.storage.server.StorageServer storage_server: A storage
server to use to look up lease information.
:param bytes storage_index: A storage index to use to look up lease
information.
:param float now: The current time as a POSIX timestamp.
:return bool: ``True`` if any only if the given storage index has a lease
with an expiration time after ``now``.
"""
leases = storage_server.get_slot_leases(storage_index)
return any(
lease.get_expiration_time() > now
for lease
in leases
)
def check_pass_quantity_for_lease(storage_index, valid_passes, storage_server):
"""
Check that the given number of passes is sufficient to add or renew a
lease for one period for the given storage index.
"""
allocated_sizes = dict(
get_share_sizes(
storage_server,
storage_index,
list(get_all_share_numbers(storage_server, storage_index)),
),
).values()
# print("allocated_sizes: {}".format(allocated_sizes))
check_pass_quantity(len(valid_passes), allocated_sizes)
# print("Checked out")
"""
Check that the given number of passes is sufficient to cover leases for
one period for shares of the given sizes.
:param int valid_count: The number of passes.
:param list[int] share_sizes: The sizes of the shares for which the lease
is being created.
:raise MorePassesRequired: If the given number of passes is too few for
the given share sizes.
:return: ``None`` if the given number of passes is sufficient.
"""
required_pass_count = required_passes(BYTES_PER_PASS, share_sizes)
if valid_count < required_pass_count:
raise MorePassesRequired(
valid_count,
required_pass_count,
)
def check_pass_quantity_for_write(valid_count, sharenums, allocated_size):
"""
Determine if the given number of valid passes is sufficient for an
attempted write.
:param int valid_count: The number of valid passes to consider.
:param set[int] sharenums: The shares being written to.
:param int allocated_size: The size of each share.
:raise MorePassedRequired: If the number of valid passes given is too
small.
:return: ``None`` if the number of valid passes given is sufficient.
"""
check_pass_quantity(valid_count, [allocated_size] * len(sharenums))
Jean-Paul Calderone
committed
def get_all_share_paths(storage_server, storage_index):
"""
Get the paths of all shares in the given storage index (or slot).
:param allmydata.storage.server.StorageServer storage_server: The storage
server which owns the storage index.
:param bytes storage_index: The storage index (or slot) in which to look
up shares.
:return: A generator of tuples of (int, bytes) giving a share number and
the path to storage for that share number.
"""
Jean-Paul Calderone
committed
bucket = join(storage_server.sharedir, storage_index_to_dir(storage_index))
try:
contents = listdir(bucket)
except OSError as e:
if e.errno == ENOENT:
return
raise
for candidate in contents:
Jean-Paul Calderone
committed
try:
sharenum = int(candidate)
except ValueError:
pass
else:
yield sharenum, join(bucket, candidate)
def get_all_share_numbers(storage_server, storage_index):
"""
Get all share numbers in the given storage index (or slot).
:param allmydata.storage.server.StorageServer storage_server: The storage
server which owns the storage index.
:param bytes storage_index: The storage index (or slot) in which to look
up share numbers.
:return: A generator of int giving share numbers.
"""
for sharenum, sharepath in get_all_share_paths(storage_server, storage_index):
yield sharenum
def get_share_sizes(storage_server, storage_index_or_slot, sharenums):
"""
Get the sizes of the given share numbers for the given storage index *or*
slot.
:param allmydata.storage.server.StorageServer storage_server: The storage
server which owns the storage index.
:param bytes storage_index_or_slot: The storage index (or slot) in which
to look up share numbers.
:param sharenums: A container of share numbers to use to filter the
results. Only information about share numbers in this container is
included in the result. Or, ``None`` to get sizes for all shares
which exist.
:return: A generator of tuples of (int, int) where the first element is a
share number and the second element is the data size for that share
number.
for sharenum, sharepath in get_all_share_paths(storage_server, storage_index_or_slot):
if stat is None:
stat = get_stat(sharepath)
if sharenums is None or sharenum in sharenums:
yield sharenum, stat(storage_server, storage_index_or_slot, sharepath).size
def get_storage_index_share_size(sharepath):
"""
Get the size of a share belonging to a storage index (an immutable share).
:param bytes sharepath: The path to the share file.
:return int: The data size of the share in bytes.
"""
with open(sharepath) as share_file:
share_data_length_bytes = share_file.read(8)[4:]
(share_data_length,) = unpack('>L', share_data_length_bytes)
return share_data_length
def get_lease_expiration(get_leases, storage_index_or_slot):
for lease in get_leases(storage_index_or_slot):
return lease.get_expiration_time()
return None
def stat_bucket(storage_server, storage_index, sharepath):
return ShareStat(
size=get_storage_index_share_size(sharepath),
lease_expiration=get_lease_expiration(storage_server.get_leases, storage_index),
)
def stat_slot(storage_server, slot, sharepath):
return ShareStat(
size=get_slot_share_size(sharepath),
lease_expiration=get_lease_expiration(storage_server.get_slot_leases, slot),
)
"""
Get the size of a share belonging to a slot (a mutable share).
:param bytes sharepath: The path to the share file.
:return int: The data size of the share in bytes.
"""
with open(sharepath) as share_file:
share_data_length_bytes = share_file.read(92)[-8:]
(share_data_length,) = unpack('>Q', share_data_length_bytes)
return share_data_length
Jean-Paul Calderone
committed
def stat_share(storage_server, storage_index_or_slot):
stat = None
for sharenum, sharepath in get_all_share_paths(storage_server, storage_index_or_slot):
if stat is None:
stat = get_stat(sharepath)
yield (sharenum, stat(storage_server, storage_index_or_slot, sharepath))
def get_stat(sharepath):
# Figure out if it is a storage index or a slot.
with open(sharepath) as share_file:
magic = share_file.read(32)
if magic == "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e":
return stat_slot
else:
return stat_bucket
# I don't understand why this is required.
# ZKAPAuthorizerStorageServer is-a Referenceable. It seems like
# the built in adapter should take care of this case.
from twisted.python.components import (
registerAdapter,
)
from foolscap.referenceable import (
ReferenceableSlicer,
)
from foolscap.ipb import (
ISlicer,
)
registerAdapter(ReferenceableSlicer, ZKAPAuthorizerStorageServer, ISlicer)