Newer
Older
# Copyright 2019 PrivateStorage.io, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The Twisted plugin that glues the Zero-Knowledge Access Pass system into
import random
from datetime import datetime
from weakref import WeakValueDictionary
from allmydata.client import _Client
from allmydata.interfaces import IAnnounceableStorageServer, IFoolscapStoragePlugin
from allmydata.node import MissingConfigEntry
from challenge_bypass_ristretto import PublicKey, SigningKey
from eliot import start_action
from prometheus_client import CollectorRegistry, write_to_textfile
from twisted.internet import task
from twisted.internet.defer import succeed
from twisted.logger import Logger
from twisted.python.filepath import FilePath
from zope.interface import implementer
from .api import ZKAPAuthorizerStorageClient, ZKAPAuthorizerStorageServer
from .config import lease_maintenance_from_tahoe_config
from .lease_maintenance import (
SERVICE_NAME,
lease_maintenance_service,
maintain_leases_from_root,
)
from .model import VoucherStore
from .resource import from_configuration as resource_from_configuration
from .server.spending import get_spender
from .spending import SpendingController
from .storage_common import BYTES_PER_PASS, get_configured_pass_value
_log = Logger()
@implementer(IAnnounceableStorageServer)
@attr.s
class AnnounceableStorageServer(object):
announcement = attr.ib()
storage_server = attr.ib()
@implementer(IFoolscapStoragePlugin)
@attr.s
"""
A storage plugin which provides a token-based access control mechanism on
top of the Tahoe-LAFS built-in storage server interface.
:ivar WeakValueDictionary _stores: A mapping from node directories to this
plugin's database connections for those nodes. The existence of any
kind of attribute to reference database connections (not so much the
fact that it is a WeakValueDictionary; if it were just a weakref the
same would be true) probably reflects an error in the interface which
forces different methods to use instance state to share a database
connection.
name = attr.ib(default=u"privatestorageio-zkapauthz-v1")
_stores = attr.ib(default=attr.Factory(WeakValueDictionary))
def _get_store(self, node_config):
"""
:return VoucherStore: The database for the given node. At most one
connection is made to the database per ``ZKAPAuthorizer`` instance.
"""
try:
s = self._stores[key]
except KeyError:
s = VoucherStore.from_node_config(node_config, datetime.now)
self._stores[key] = s
return s
def _get_redeemer(self, node_config, announcement, reactor):
"""
:return IRedeemer: The voucher redeemer indicated by the given
configuration. A new instance is returned on every call because
the redeemer interface is stateless.
"""
return get_redeemer(self.name, node_config, announcement, reactor)
def get_storage_server(
self, configuration, get_anonymous_storage_server, reactor=None
):
if reactor is None:
from twisted.internet import reactor
registry = CollectorRegistry()
kwargs = configuration.copy()
# If metrics are desired, schedule their writing to disk.
metrics_interval = kwargs.pop(u"prometheus-metrics-interval", None)
metrics_path = kwargs.pop(u"prometheus-metrics-path", None)
if metrics_interval is not None and metrics_path is not None:
FilePath(metrics_path).parent().makedirs(ignoreExistingDirectory=True)
t = task.LoopingCall(make_safe_writer(metrics_path, registry))
t.clock = reactor
t.start(int(metrics_interval))
root_url = kwargs.pop(u"ristretto-issuer-root-url")
pass_value = int(kwargs.pop(u"pass-value", BYTES_PER_PASS))
signing_key = load_signing_key(
FilePath(
kwargs.pop(u"ristretto-signing-key-path"),
public_key = PublicKey.from_signing_key(signing_key)
announcement = {
u"ristretto-issuer-root-url": root_url,
u"ristretto-public-keys": [public_key.encode_base64()],
}
anonymous_storage_server = get_anonymous_storage_server()
spender = get_spender(
config=kwargs,
reactor=reactor,
registry=registry,
)
anonymous_storage_server,
pass_value=pass_value,
signing_key=signing_key,
spender=spender,
registry=registry,
**kwargs
return succeed(
AnnounceableStorageServer(
announcement,
storage_server,
),
)
def get_storage_client(self, node_config, announcement, get_rref):
"""
Create an ``IStorageClient`` that submits ZKAPs with certain requests in
order to authorize them. The ZKAPs are extracted from the database
managed by this plugin in the node directory that goes along with
``node_config``.
"""
from twisted.internet import reactor
redeemer = self._get_redeemer(node_config, announcement, reactor)
store = self._get_store(node_config)
controller = SpendingController.for_store(
tokens_to_passes=redeemer.tokens_to_passes,
store=store,
return ZKAPAuthorizerStorageClient(
get_configured_pass_value(node_config),
get_rref,
def get_client_resource(self, node_config, reactor=None):
"""
Get an ``IZKAPRoot`` for the given node configuration.
:param allmydata.node._Config node_config: The configuration object
for the relevant node.
"""
Jean-Paul Calderone
committed
if reactor is None:
from twisted.internet import reactor
return resource_from_configuration(
node_config,
store=self._get_store(node_config),
redeemer=self._get_redeemer(node_config, None, reactor),
Jean-Paul Calderone
committed
clock=reactor,
def make_safe_writer(metrics_path, registry):
# type: (str, CollectorRegistry) -> Callable[[], None]
"""
Make a no-argument callable that writes metrics from the given registry to
the given path. The callable will log errors writing to the path and not
raise exceptions.
"""
def safe_writer():
try:
with start_action(
action_type=u"zkapauthorizer:metrics:write-to-textfile",
metrics_path=metrics_path,
):
write_to_textfile(metrics_path, registry)
except Exception:
pass
_init_storage = _Client.__dict__["init_storage"]
def maintenance_init_storage(self, announceable_storage_servers):
"""
A monkey-patched version of ``_Client.init_storage`` which also
initializes the lease maintenance service.
"""
from twisted.internet import reactor
result = _init_storage(self, announceable_storage_servers)
_maybe_attach_maintenance_service(reactor, self)
return result
_Client.init_storage = maintenance_init_storage
def _maybe_attach_maintenance_service(reactor, client_node):
"""
Check for an existing lease maintenance service and if one is not found,
create one.
:param allmydata.client._Client client_node: The client node to check and,
possibly, modify. A lease maintenance service is added to it if and
only if one is not already present.
"""
try:
# If there is already one we don't need another.
client_node.getServiceNamed(SERVICE_NAME)
except KeyError:
# There isn't one so make it and add it.
_log.info("Creating new lease maintenance service")
_create_maintenance_service(
reactor,
client_node.config,
client_node,
).setServiceParent(client_node)
_log.failure("Attaching maintenance service to client node")
else:
_log.info("Found existing lease maintenance service")
def _create_maintenance_service(reactor, node_config, client_node):
"""
Create a lease maintenance service to be attached to the given client
node.
:param allmydata.node._Config node_config: The configuration for the node
the lease maintenance service will be attached to.
:param allmydata.client._Client client_node: The client node the lease
maintenance service will be attached to.
"""
def get_now():
return datetime.utcfromtimestamp(reactor.seconds())
store = storage_server._get_store(node_config)
maint_config = lease_maintenance_from_tahoe_config(node_config)
# Create the operation which performs the lease maintenance job when
# called.
maintain_leases = maintain_leases_from_root(
get_root_nodes=partial(get_root_nodes, client_node, node_config),
storage_broker=client_node.get_storage_broker(),
secret_holder=client_node._secret_holder,
min_lease_remaining=maint_config.min_lease_remaining,
progress=store.start_lease_maintenance,
get_now=get_now,
)
node_config.get_private_path(u"last-lease-maintenance-run")
# Create the service to periodically run the lease maintenance operation.
return lease_maintenance_service(
maintain_leases,
reactor,
random,
lease_maint_config=maint_config,
)
def get_root_nodes(client_node, node_config):
try:
rootcap = node_config.get_private_config(b"rootcap")
except MissingConfigEntry:
return []
else:
return [client_node.create_node_from_uri(rootcap)]
def load_signing_key(path):
"""
Read a serialized Ristretto signing key from the given path and return it
as a ``challenge_bypass_ristretto.SigningKey``.
Unlike ``challenge_bypass_ristretto.SigningKey.decode_base64`` this
function will clean up any whitespace around the key.
:param FilePath path: The path from which to read the key.
:raise challenge_bypass_ristretto.DecodeException: If
``SigningKey.decode_base64`` raises this exception it will be passed
through.
:return challenge_bypass_ristretto.SigningKey: An object representing the
key read.
"""
return SigningKey.decode_base64(path.getContent().strip())