"...PrivateStorageio.git" did not exist on "e7e01ed43695001129bad9f3a0fa2a1dc2ad115c"
Newer
Older
# Copyright 2019 PrivateStorage.io, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
A Tahoe-LAFS RIStorageServer-alike which authorizes writes and lease
This is the server part of a storage access protocol. The client part is
implemented in ``_storage_client.py``.
from __future__ import (
absolute_import,
)
from errno import (
ENOENT,
)
from functools import (
partial,
)
Jean-Paul Calderone
committed
from os.path import (
join,
)
from os import (
listdir,
stat,
)
import attr
from attr.validators import (
provides,
instance_of,
from foolscap.ipb import (
IReferenceable,
IRemotelyCallable,
)
from allmydata.interfaces import (
RIStorageServer,
)
Jean-Paul Calderone
committed
from allmydata.storage.common import (
storage_index_to_dir,
)
from privacypass import (
TokenPreimage,
VerificationSignature,
SigningKey,
)
from twisted.python.reflect import (
namedAny,
)
from twisted.internet.interfaces import (
IReactorTime,
)
from .foolscap import (
from .storage_common import (
BYTES_PER_PASS,
required_passes,
allocate_buckets_message,
add_lease_message,
renew_lease_message,
slot_testv_and_readv_and_writev_message,
Jean-Paul Calderone
committed
get_sharenums,
get_allocated_size,
get_implied_data_length,
class MorePassesRequired(Exception):
"""
Storage operations fail with ``MorePassesRequired`` when they are not
accompanied by a sufficient number of valid passes.
:ivar int valid_count: The number of valid passes presented in the
operation.
ivar int required_count: The number of valid passes which must be
presented for the operation to be authorized.
"""
def __init__(self, valid_count, required_count):
self.valid_count = valid_count
self.required_count = required_count
def __repr__(self):
return "MorePassedRequired(valid_count={}, required_count={})".format(
self.valid_count,
self.required_count,
)
def __str__(self):
return repr(self)
class LeaseRenewalRequired(Exception):
"""
Mutable write operations fail with ``LeaseRenewalRequired`` when the slot
which is the target of the write does not have an active lease and no
passes are supplied to create one.
"""
@implementer_only(RIPrivacyPassAuthorizedStorageServer, IReferenceable, IRemotelyCallable)
# It would be great to use `frozen=True` (value-based hashing) instead of
# `cmp=False` (identity based hashing) but Referenceable wants to set some
# attributes on self and it's hard to avoid that.
@attr.s(cmp=False)
class ZKAPAuthorizerStorageServer(Referenceable):
A class which wraps an ``RIStorageServer`` to insert pass validity checks
_original = attr.ib(validator=provides(RIStorageServer))
_signing_key = attr.ib(validator=instance_of(SigningKey))
_clock = attr.ib(
validator=provides(IReactorTime),
default=attr.Factory(partial(namedAny, "twisted.internet.reactor")),
)
def _is_invalid_pass(self, message, pass_):
:param unicode message: The shared message for pass validation.
:param bytes pass_: The encoded pass to validate.
:return bool: ``False`` (invalid) if the pass includes a valid
signature, ``True`` (valid) otherwise.
"""
assert isinstance(message, unicode), "message %r not unicode" % (message,)
assert isinstance(pass_, bytes), "pass %r not bytes" % (pass_,)
try:
preimage_base64, signature_base64 = pass_.split(b" ")
preimage = TokenPreimage.decode_base64(preimage_base64)
proposed_signature = VerificationSignature.decode_base64(signature_base64)
unblinded_token = self._signing_key.rederive_unblinded_token(preimage)
verification_key = unblinded_token.derive_verification_key_sha512()
invalid_pass = verification_key.invalid_sha512(proposed_signature, message.encode("utf-8"))
return invalid_pass
except Exception:
# It would be pretty nice to log something here, sometimes, I guess?
return True
def _validate_passes(self, message, passes):
"""
Check all of the given passes for validity.
:param unicode message: The shared message for pass validation.
:param list[bytes] passes: The encoded passes to validate.
:return list[bytes]: The passes which are found to be valid.
return list(
pass_
for pass_
in passes
if not self._is_invalid_pass(message, pass_)
)
Pass-through without pass check to allow clients to learn about our
version and configuration in case it helps them decide how to behave.
"""
return self._original.remote_get_version()
def remote_allocate_buckets(self, passes, storage_index, renew_secret, cancel_secret, sharenums, allocated_size, canary):
Pass-through after a pass check to ensure that clients can only allocate
storage for immutable shares if they present valid passes.
valid_passes = self._validate_passes(
allocate_buckets_message(storage_index),
passes,
)
check_pass_quantity_for_write(len(valid_passes), sharenums, allocated_size)
return self._original.remote_allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums,
allocated_size,
canary,
)
def remote_get_buckets(self, storage_index):
Pass-through without pass check to let clients read immutable shares as
return self._original.remote_get_buckets(storage_index)
def remote_add_lease(self, passes, storage_index, *a, **kw):
Pass-through after a pass check to ensure clients can only extend the
duration of share storage if they present valid passes.
self._validate_passes(add_lease_message(storage_index), passes)
return self._original.remote_add_lease(storage_index, *a, **kw)
def remote_renew_lease(self, passes, storage_index, *a, **kw):
Pass-through after a pass check to ensure clients can only extend the
duration of share storage if they present valid passes.
self._validate_passes(renew_lease_message(storage_index), passes)
return self._original.remote_renew_lease(storage_index, *a, **kw)
def remote_advise_corrupt_share(self, *a, **kw):
Pass-through without a pass check to let clients inform us of possible
issues with the system without incurring any cost to themselves.
"""
return self._original.remote_advise_corrupt_share(*a, **kw)
def remote_slot_share_sizes(self, storage_index, sharenums):
try:
return get_slot_share_size(self._original, storage_index, sharenums)
except OSError as e:
if e.errno == ENOENT:
return None
raise
def remote_slot_testv_and_readv_and_writev(
self,
storage_index,
secrets,
tw_vectors,
r_vector,
):
Pass-through after a pass check to ensure clients can only allocate
storage for mutable shares if they present valid passes.
:note: This method can be used both to allocate storage and to rewrite
data in already-allocated storage. These cases may not be the
same from the perspective of pass validation.
renew_leases = False
if has_writes(tw_vectors):
Jean-Paul Calderone
committed
# Passes may be supplied with the write to create the
# necessary lease as part of the same operation. This must be
# supported because there is no separate protocol action to
# *create* a slot. Clients just begin writing to it.
valid_passes = self._validate_passes(
slot_testv_and_readv_and_writev_message(storage_index),
passes,
)
if has_active_lease(self._original, storage_index, self._clock.seconds()):
current_length = get_slot_share_size(self._original, storage_index, tw_vectors.keys())
new_length = sum(
(
get_implied_data_length(data_vector, new_length)
for (_, data_vector, new_length)
in tw_vectors.values()
),
0,
Jean-Paul Calderone
committed
required_new_passes = (
required_passes(BYTES_PER_PASS, {0}, new_length)
- required_passes(BYTES_PER_PASS, {0}, current_length)
)
if required_new_passes > len(valid_passes):
raise MorePassesRequired(len(valid_passes), required_new_passes)
else:
check_pass_quantity_for_mutable_write(len(valid_passes), tw_vectors)
renew_leases = True
Jean-Paul Calderone
committed
# Skip over the remotely exposed method and jump to the underlying
# implementation which accepts one additional parameter that we know
# about (and don't expose over the network): renew_leases. We always
# pass False for this because we want to manage leases completely
# separately from writes.
return self._original.slot_testv_and_readv_and_writev(
storage_index,
secrets,
tw_vectors,
r_vector,
renew_leases=renew_leases,
def remote_slot_readv(self, *a, **kw):
Pass-through without a pass check to let clients read mutable shares as
return self._original.remote_slot_readv(*a, **kw)
def has_active_lease(storage_server, storage_index, now):
"""
:param allmydata.storage.server.StorageServer storage_server: A storage
server to use to look up lease information.
:param bytes storage_index: A storage index to use to look up lease
information.
:param float now: The current time as a POSIX timestamp.
:return bool: ``True`` if any only if the given storage index has a lease
with an expiration time after ``now``.
"""
leases = storage_server.get_slot_leases(storage_index)
return any(
lease.get_expiration_time() > now
for lease
in leases
)
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
def check_pass_quantity_for_write(valid_count, sharenums, allocated_size):
"""
Determine if the given number of valid passes is sufficient for an
attempted write.
:param int valid_count: The number of valid passes to consider.
:param set[int] sharenums: The shares being written to.
:param int allocated_size: The size of each share.
:raise MorePassedRequired: If the number of valid passes given is too
small.
:return: ``None`` if the number of valid passes given is sufficient.
"""
required_pass_count = required_passes(BYTES_PER_PASS, sharenums, allocated_size)
if valid_count < required_pass_count:
raise MorePassesRequired(
valid_count,
required_pass_count,
)
def check_pass_quantity_for_mutable_write(valid_count, tw_vectors):
"""
Determine if the given number of valid passes is sufficient for an
attempted write to a slot.
:param int valid_count: The number of valid passes to consider.
:param tw_vectors: See
``allmydata.interfaces.TestAndWriteVectorsForShares``.
"""
sharenums = get_sharenums(tw_vectors)
allocated_size = get_allocated_size(tw_vectors)
check_pass_quantity_for_write(valid_count, sharenums, allocated_size)
Jean-Paul Calderone
committed
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def get_slot_share_size(storage_server, storage_index, sharenums):
"""
Total the on-disk storage committed to the given shares in the given
storage index.
:param allmydata.storage.server.StorageServer storage_server: The storage
server which owns the on-disk storage.
:param bytes storage_index: The storage index to inspect.
:param list[int] sharenums: The share numbers to consider.
:return int: The number of bytes the given shares use on disk. Note this
is naive with respect to filesystem features like compression or
sparse files. It is just a total of the size reported by the
filesystem.
"""
total = 0
bucket = join(storage_server.sharedir, storage_index_to_dir(storage_index))
for candidate in listdir(bucket):
try:
sharenum = int(candidate)
except ValueError:
pass
else:
if sharenum in sharenums:
try:
metadata = stat(join(bucket, candidate))
except Exception as e:
print(e)
else:
total += metadata.st_size
return total
# I don't understand why this is required.
# ZKAPAuthorizerStorageServer is-a Referenceable. It seems like
# the built in adapter should take care of this case.
from twisted.python.components import (
registerAdapter,
)
from foolscap.referenceable import (
ReferenceableSlicer,
)
from foolscap.ipb import (
ISlicer,
)
registerAdapter(ReferenceableSlicer, ZKAPAuthorizerStorageServer, ISlicer)