# Copyright 2019 PrivateStorage.io, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Tests for the web resource provided by the client part of the Tahoe-LAFS plugin. """ from datetime import datetime from io import BytesIO from typing import Optional, Set from urllib.parse import quote import attr from allmydata.client import config_from_string from aniso8601 import parse_datetime from fixtures import TempDir from hypothesis import given, note from hypothesis.strategies import ( SearchStrategy, binary, builds, datetimes, dictionaries, fixed_dictionaries, integers, just, lists, none, one_of, sampled_from, text, tuples, ) from openapi_spec_validator import validate_spec from openapi_spec_validator.readers import read_from_filename from testtools import TestCase from testtools.content import text_content from testtools.matchers import ( AfterPreprocessing, Always, ContainsDict, Equals, GreaterThan, Is, IsInstance, MatchesAll, MatchesAny, MatchesStructure, ) from testtools.twistedsupport import CaptureTwistedLogs, succeeded from treq.testing import RequestTraversalAgent from twisted.internet.task import Clock, Cooperator from twisted.python.filepath import FilePath from twisted.web.client import FileBodyProducer, readBody from twisted.web.http import BAD_REQUEST, NOT_FOUND, NOT_IMPLEMENTED, OK, UNAUTHORIZED from twisted.web.http_headers import Headers from twisted.web.resource import IResource, getChildForRequest from .. import __file__ as package_init_file from .. import __version__ as zkapauthorizer_version from .._base64 import urlsafe_b64decode from .._json import dumps_utf8 from ..api import NAME from ..configutil import config_string_from_sections from ..model import ( DoubleSpend, Error, Redeemed, Redeeming, Unpaid, Voucher, VoucherStore, memory_connect, ) from ..pricecalculator import PriceCalculator from ..resource import NUM_TOKENS, from_configuration, get_token_count from ..storage_common import ( get_configured_allowed_public_keys, get_configured_pass_value, required_passes, ) from .json import loads from .matchers import Provides, between, matches_response from .strategies import ( api_auth_tokens, client_doublespendredeemer_configurations, client_dummyredeemer_configurations, client_errorredeemer_configurations, client_nonredeemer_configurations, client_unpaidredeemer_configurations, direct_tahoe_configs, posix_timestamps, request_paths, requests, share_parameters, tahoe_configs, vouchers, ) TRANSIENT_ERROR = "something went wrong, who knows what" # Helper to work-around https://github.com/twisted/treq/issues/161 def uncooperator(started=True): return Cooperator( # Don't stop consuming the iterator until it's done. terminationPredicateFactory=lambda: lambda: False, scheduler=lambda what: (what(), object())[1], started=started, ) def is_not_json(bytestring): """ :param bytes bytestring: A candidate byte string to inspect. :return bool: ``False`` if and only if ``bytestring`` is JSON encoded. """ try: loads(bytestring) except: return True return False def not_vouchers(): """ Builds byte strings which are not legal vouchers. """ return one_of( text() .filter( lambda t: (not is_urlsafe_base64(t)), ) .map(lambda t: t.encode("utf-8")), vouchers().map( # Turn a valid voucher into a voucher that is invalid only by # containing a character from the base64 alphabet in place of one # from the urlsafe-base64 alphabet. lambda voucher: b"/" + voucher[1:], ), ) def is_urlsafe_base64(text): """ :param str text: A candidate text string to inspect. :return bool: ``True`` if and only if ``text`` is urlsafe-base64 encoded """ try: urlsafe_b64decode(text) except: return False return True def invalid_bodies(): """ Build byte strings that ``PUT /voucher`` considers invalid. """ return one_of( # The wrong key but the right kind of value. fixed_dictionaries( { "some-key": vouchers().map(lambda v: v.decode("utf-8")), } ).map(dumps_utf8), # The right key but the wrong kind of value. fixed_dictionaries( { "voucher": one_of( integers(), not_vouchers().map(lambda v: v.decode("utf-8")), ), } ).map(dumps_utf8), # Not even JSON binary().filter(is_not_json), ) def root_from_config(config, now): """ Create a client root resource from a Tahoe-LAFS configuration. :param _Config config: The Tahoe-LAFS configuration. :param now: A no-argument callable that returns the time of the call as a ``datetime`` instance. :return IResource: The root client resource. """ return from_configuration( config, VoucherStore.from_node_config( config, now, memory_connect, ), clock=Clock(), ) def authorized_request(api_auth_token, agent, method, uri, headers=None, data=None): """ Issue a request with the required token-based authorization header value. :param bytes api_auth_token: The API authorization token to include. :param IAgent agent: The agent to use to issue the request. :param bytes method: The HTTP method for the request. :param bytes uri: The URI for the request. :param ({bytes: [bytes]})|None headers: If not ``None``, extra request headers to include. The **Authorization** header will be overwritten if it is present. :param BytesIO|None data: If not ``None``, the request body. :return: A ``Deferred`` like the one returned by ``IAgent.request``. """ if data is None: bodyProducer = None else: bodyProducer = FileBodyProducer(data, cooperator=uncooperator()) if headers is None: headers = Headers() else: headers = Headers(headers) headers.setRawHeaders( "authorization", [b"tahoe-lafs " + api_auth_token], ) return agent.request( method, uri, headers=headers, bodyProducer=bodyProducer, ) def get_config_with_api_token(tempdir, get_config, api_auth_token): """ Get a ``_Config`` object. :param TempDir tempdir: A temporary directory in which to create the Tahoe-LAFS node associated with the configuration. :param (bytes -> bytes -> _Config) get_config: A function which takes a node directory and a Foolscap "portnum" filename and returns the configuration object. :param bytes api_auth_token: The HTTP API authorization token to write to the node directory. """ basedir = tempdir.join("tahoe") config = get_config(basedir, "tub.port") add_api_token_to_config( basedir, config, api_auth_token, ) return config def add_api_token_to_config(basedir, config, api_auth_token): """ Create a private directory beneath the given base directory, point the given config at it, and write the given API auth token to it. """ FilePath(basedir).child("private").makedirs() config._basedir = basedir config.write_private_config("api_auth_token", api_auth_token) class OpenAPITests(TestCase): """ Tests for the OpenAPI specification for the HTTP API. """ def test_backup_recovery_valid(self): """ The specification document is valid OpenAPI 3.0. """ spec_path = FilePath(package_init_file).sibling("backup-recovery.yaml") spec_dict, spec_url = read_from_filename(spec_path.path) # If no exception is raised then the spec is valid. validate_spec(spec_dict) class FromConfigurationTests(TestCase): """ Tests for ``from_configuration``. """ @given(tahoe_configs()) def test_allowed_public_keys(self, get_config): """ The controller created by ``from_configuration`` is configured to allow the public keys found in the configuration. """ tempdir = self.useFixture(TempDir()) config = get_config(tempdir.join("tahoe"), "tub.port") allowed_public_keys = get_configured_allowed_public_keys(config) # root_from_config is just an easier way to call from_configuration root = root_from_config(config, datetime.now) self.assertThat( root.controller, MatchesStructure( allowed_public_keys=Equals(allowed_public_keys), ), ) class GetTokenCountTests(TestCase): """ Tests for ``get_token_count``. """ @given(one_of(none(), integers(min_value=16))) def test_get_token_count(self, token_count): """ ``get_token_count`` returns the integer value of the ``default-token-count`` item from the given configuration object. """ plugin_name = "hello-world" if token_count is None: expected_count = NUM_TOKENS token_config = {} else: expected_count = token_count token_config = {"default-token-count": f"{expected_count}"} config_text = config_string_from_sections( [ { "storageclient.plugins." + plugin_name: token_config, } ] ) node_config = config_from_string( self.useFixture(TempDir()).join("tahoe"), "tub.port", config_text.encode("utf-8"), ) self.assertThat( get_token_count(plugin_name, node_config), Equals(expected_count), ) class ResourceTests(TestCase): """ General tests for the resources exposed by the plugin. """ @given( tahoe_configs(), request_paths(), ) def test_unauthorized(self, get_config, path): """ A request for any resource without the required authorization token receives a 401 response. """ tempdir = self.useFixture(TempDir()) config = get_config(tempdir.join("tahoe"), "tub.port") root = root_from_config(config, datetime.now) agent = RequestTraversalAgent(root) requesting = agent.request( b"GET", b"http://127.0.0.1/" + b"/".join(path), ) responses = [] requesting.addCallback(responses.append) self.assertThat( requesting, succeeded(Always()), ) [response] = responses self.assertThat( response.code, Equals(UNAUTHORIZED), ) @given( tahoe_configs(), requests( sampled_from( [ [b"unblinded-token"], [b"voucher"], [b"version"], ] ) ), ) def test_reachable(self, get_config, request): """ A resource is reachable at a child of the resource returned by ``from_configuration``. """ tempdir = self.useFixture(TempDir()) config = get_config(tempdir.join("tahoe"), "tub.port") root = root_from_config(config, datetime.now) self.assertThat( getChildForRequest(root, request), Provides([IResource]), ) @given( tahoe_configs(), api_auth_tokens(), ) def test_version(self, get_config, api_auth_token): """ The ZKAPAuthorizer package version is available in a JSON response to a **GET** to ``/version``. """ config = get_config_with_api_token( self.useFixture(TempDir()), get_config, api_auth_token, ) root = root_from_config(config, datetime.now) agent = RequestTraversalAgent(root) requesting = authorized_request( api_auth_token, agent, b"GET", b"http://127.0.0.1/version", ) self.assertThat( requesting, succeeded( matches_response( code_matcher=Equals(OK), body_matcher=AfterPreprocessing( loads, Equals({"version": zkapauthorizer_version}), ), ), ), ) def maybe_extra_tokens(): """ Build either ``None`` or a small integer for use in determining a number of additional tokens to create in some tests. """ # We might want to have some unblinded tokens or we might not. return one_of( just(None), # If we do, we can't have fewer than the number of redemption groups # which we don't know until we're further inside the test. So supply # an amount to add to that, in the case where we have tokens at all. integers(min_value=0, max_value=100), ) class UnblindedTokenTests(TestCase): """ Tests relating to ``/unblinded-token`` as implemented by the ``_zkapauthorizer.resource`` module. """ def setUp(self): super(UnblindedTokenTests, self).setUp() self.useFixture(CaptureTwistedLogs()) @given( tahoe_configs(), api_auth_tokens(), lists( lists( integers(min_value=0, max_value=2 ** 63 - 1), min_size=1, ), ), datetimes(), ) def test_latest_lease_maintenance_spending( self, get_config, api_auth_token, size_observations, now ): """ The most recently completed record of lease maintenance spending activity is reported in the response to a **GET** request. """ config = get_config_with_api_token( self.useFixture(TempDir()), get_config, api_auth_token, ) root = root_from_config(config, lambda: now) # Put some activity into it. total = 0 activity = root.store.start_lease_maintenance() for sizes in size_observations: total += required_passes(root.store.pass_value, sizes) activity.observe(sizes) activity.finish() agent = RequestTraversalAgent(root) d = authorized_request( api_auth_token, agent, b"GET", b"http://127.0.0.1/lease-maintenance", ) d.addCallback(readBody) d.addCallback( lambda body: loads(body)["spending"], ) self.assertThat( d, succeeded( Equals( { "when": now.isoformat(), "count": total, } ) ), ) def matches_lease_maintenance_spending(): """ :return: A matcher which matches the value of the *spending* key in the ``lease-maintenance`` endpoint response. """ return MatchesAny( Is(None), ContainsDict( { "when": matches_iso8601_datetime(), "amount": matches_positive_integer(), } ), ) def matches_positive_integer(): return MatchesAll( IsInstance(int), GreaterThan(0), ) def matches_iso8601_datetime(): """ :return: A matcher which matches text strings which can be parsed as an ISO8601 datetime string. """ return MatchesAll( IsInstance(str), AfterPreprocessing( parse_datetime, lambda d: Always(), ), ) class VoucherTests(TestCase): """ Tests relating to ``/voucher`` as implemented by the ``_zkapauthorizer.resource`` module and its handling of vouchers. """ def setUp(self): super(VoucherTests, self).setUp() self.useFixture(CaptureTwistedLogs()) @given(tahoe_configs(), api_auth_tokens(), vouchers()) def test_put_voucher(self, get_config, api_auth_token, voucher): """ When a voucher is ``PUT`` to ``VoucherCollection`` it is passed in to the redemption model object for handling and an ``OK`` response is returned. """ config = get_config_with_api_token( self.useFixture(TempDir()), get_config, api_auth_token, ) root = root_from_config(config, datetime.now) agent = RequestTraversalAgent(root) data = BytesIO(dumps_utf8({"voucher": voucher.decode("ascii")})) requesting = authorized_request( api_auth_token, agent, b"PUT", b"http://127.0.0.1/voucher", data=data, ) self.addDetail( "requesting result", text_content(f"{vars(requesting.result)}"), ) self.assertThat( requesting, succeeded( ok_response(), ), ) @given(tahoe_configs(), api_auth_tokens(), invalid_bodies()) def test_put_invalid_body(self, get_config, api_auth_token, body): """ If the body of a ``PUT`` to ``VoucherCollection`` does not consist of an object with a single *voucher* property then the response is *BAD REQUEST*. """ config = get_config_with_api_token( self.useFixture(TempDir()), get_config, api_auth_token, ) root = root_from_config(config, datetime.now) agent = RequestTraversalAgent(root) requesting = authorized_request( api_auth_token, agent, b"PUT", b"http://127.0.0.1/voucher", data=BytesIO(body), ) self.addDetail( "requesting result", text_content(f"{vars(requesting.result)}"), ) self.assertThat( requesting, succeeded( bad_request_response(), ), ) @given(tahoe_configs(), api_auth_tokens(), not_vouchers()) def test_get_invalid_voucher(self, get_config, api_auth_token, not_voucher): """ When a syntactically invalid voucher is requested with a ``GET`` to a child of ``VoucherCollection`` the response is **BAD REQUEST**. """ config = get_config_with_api_token( self.useFixture(TempDir()), get_config, api_auth_token, ) root = root_from_config(config, datetime.now) agent = RequestTraversalAgent(root) url = "http://127.0.0.1/voucher/{}".format( quote( not_voucher, safe=b"", ), ).encode("ascii") requesting = authorized_request( api_auth_token, agent, b"GET", url, ) self.assertThat( requesting, succeeded( bad_request_response(), ), ) @given(tahoe_configs(), api_auth_tokens(), vouchers()) def test_get_unknown_voucher(self, get_config, api_auth_token, voucher): """ When a voucher is requested with a ``GET`` to a child of ``VoucherCollection`` the response is **NOT FOUND** if the voucher hasn't previously been submitted with a ``PUT``. """ config = get_config_with_api_token( self.useFixture(TempDir()), get_config, api_auth_token, ) root = root_from_config(config, datetime.now) agent = RequestTraversalAgent(root) requesting = authorized_request( api_auth_token, agent, b"GET", b"http://127.0.0.1/voucher/" + voucher, ) self.assertThat( requesting, succeeded( not_found_response(), ), ) @given( direct_tahoe_configs(client_nonredeemer_configurations()), api_auth_tokens(), datetimes(), vouchers(), ) def test_get_known_voucher_redeeming(self, config, api_auth_token, now, voucher): """ When a voucher is first ``PUT`` and then later a ``GET`` is issued for the same voucher then the response code is **OK** and details, including those relevant to a voucher which is actively being redeemed, about the voucher are included in a json-encoded response body. """ count = get_token_count(NAME, config) return self._test_get_known_voucher( config, api_auth_token, now, voucher, MatchesStructure( number=Equals(voucher), expected_tokens=Equals(count), created=Equals(now), state=Equals( Redeeming( started=now, counter=0, ) ), ), ) @given( direct_tahoe_configs(client_dummyredeemer_configurations()), api_auth_tokens(), datetimes(), vouchers(), ) def test_get_known_voucher_redeemed(self, config, api_auth_token, now, voucher): """ When a voucher is first ``PUT`` and then later a ``GET`` is issued for the same voucher then the response code is **OK** and details, including those relevant to a voucher which has been redeemed, about the voucher are included in a json-encoded response body. """ count = get_token_count(NAME, config) return self._test_get_known_voucher( config, api_auth_token, now, voucher, MatchesStructure( number=Equals(voucher), expected_tokens=Equals(count), created=Equals(now), state=Equals( Redeemed( finished=now, token_count=count, ) ), ), ) @given( direct_tahoe_configs(client_doublespendredeemer_configurations()), api_auth_tokens(), datetimes(), vouchers(), ) def test_get_known_voucher_doublespend(self, config, api_auth_token, now, voucher): """ When a voucher is first ``PUT`` and then later a ``GET`` is issued for the same voucher then the response code is **OK** and details, including those relevant to a voucher which has failed redemption because it was already redeemed, about the voucher are included in a json-encoded response body. """ count = get_token_count(NAME, config) return self._test_get_known_voucher( config, api_auth_token, now, voucher, MatchesStructure( number=Equals(voucher), expected_tokens=Equals(count), created=Equals(now), state=Equals( DoubleSpend( finished=now, ) ), ), ) @given( direct_tahoe_configs(client_unpaidredeemer_configurations()), api_auth_tokens(), datetimes(), vouchers(), ) def test_get_known_voucher_unpaid(self, config, api_auth_token, now, voucher): """ When a voucher is first ``PUT`` and then later a ``GET`` is issued for the same voucher then the response code is **OK** and details, including those relevant to a voucher which has failed redemption because it has not been paid for yet, about the voucher are included in a json-encoded response body. """ count = get_token_count(NAME, config) return self._test_get_known_voucher( config, api_auth_token, now, voucher, MatchesStructure( number=Equals(voucher), expected_tokens=Equals(count), created=Equals(now), state=Equals( Unpaid( finished=now, ) ), ), ) @given( direct_tahoe_configs(client_errorredeemer_configurations(TRANSIENT_ERROR)), api_auth_tokens(), datetimes(), vouchers(), ) def test_get_known_voucher_error(self, config, api_auth_token, now, voucher): """ When a voucher is first ``PUT`` and then later a ``GET`` is issued for the same voucher then the response code is **OK** and details, including those relevant to a voucher which has failed redemption due to any kind of transient conditions, about the voucher are included in a json-encoded response body. """ count = get_token_count(NAME, config) return self._test_get_known_voucher( config, api_auth_token, now, voucher, MatchesStructure( number=Equals(voucher), expected_tokens=Equals(count), created=Equals(now), state=Equals( Error( finished=now, details=TRANSIENT_ERROR, ) ), ), ) def _test_get_known_voucher( self, config, api_auth_token, now, voucher, voucher_matcher ): """ Assert that a voucher that is ``PUT`` and then ``GET`` is represented in the JSON response. :param voucher_matcher: A matcher which matches the voucher expected to be returned by the ``GET``. """ add_api_token_to_config( self.useFixture(TempDir()).join("tahoe"), config, api_auth_token, ) root = root_from_config(config, lambda: now) agent = RequestTraversalAgent(root) putting = authorized_request( api_auth_token, agent, b"PUT", b"http://127.0.0.1/voucher", data=BytesIO(dumps_utf8({"voucher": voucher.decode("ascii")})), ) self.assertThat( putting, succeeded( ok_response(), ), ) getting = authorized_request( api_auth_token, agent, b"GET", "http://127.0.0.1/voucher/{}".format( quote( voucher, safe="", ), ).encode("ascii"), ) self.assertThat( getting, succeeded( MatchesAll( ok_response(headers=application_json()), AfterPreprocessing( readBody, succeeded( AfterPreprocessing( Voucher.from_json, voucher_matcher, ), ), ), ), ), ) @given( direct_tahoe_configs(), api_auth_tokens(), datetimes(), lists(vouchers(), unique=True), ) def test_list_vouchers(self, config, api_auth_token, now, vouchers): """ A ``GET`` to the ``VoucherCollection`` itself returns a list of existing vouchers. """ count = get_token_count(NAME, config) return self._test_list_vouchers( config, api_auth_token, now, vouchers, Equals( { "vouchers": list( Voucher( number=voucher, expected_tokens=count, created=now, state=Redeemed( finished=now, token_count=count, ), ).marshal() for voucher in vouchers ), } ), ) @given( direct_tahoe_configs(client_unpaidredeemer_configurations()), api_auth_tokens(), datetimes(), lists(vouchers(), unique=True), ) def test_list_vouchers_transient_states( self, config, api_auth_token, now, vouchers ): """ A ``GET`` to the ``VoucherCollection`` itself returns a list of existing vouchers including state information that reflects transient states. """ count = get_token_count(NAME, config) return self._test_list_vouchers( config, api_auth_token, now, vouchers, Equals( { "vouchers": list( Voucher( number=voucher, expected_tokens=count, created=now, state=Unpaid( finished=now, ), ).marshal() for voucher in vouchers ), } ), ) def _test_list_vouchers( self, config, api_auth_token, now, vouchers, match_response_object ): add_api_token_to_config( # Hypothesis causes our test case instances to be re-used many # times between setUp and tearDown. Avoid re-using the same # temporary directory for every Hypothesis iteration because this # test leaves state behind that invalidates future iterations. self.useFixture(TempDir()).join("tahoe"), config, api_auth_token, ) root = root_from_config(config, lambda: now) agent = RequestTraversalAgent(root) note("{} vouchers".format(len(vouchers))) for voucher in vouchers: data = BytesIO(dumps_utf8({"voucher": voucher.decode("ascii")})) putting = authorized_request( api_auth_token, agent, b"PUT", b"http://127.0.0.1/voucher", data=data, ) self.assertThat( putting, succeeded( ok_response(), ), ) getting = authorized_request( api_auth_token, agent, b"GET", b"http://127.0.0.1/voucher", ) self.assertThat( getting, succeeded( MatchesAll( ok_response(headers=application_json()), AfterPreprocessing( json_content, succeeded( match_response_object, ), ), ), ), ) def mime_types(blacklist: Optional[Set[str]] = None) -> SearchStrategy[str]: """ Build MIME types as b"major/minor" byte strings. :param blacklist: If not ``None``, MIME types to exclude from the result. """ if blacklist is None: blacklist = set() return ( tuples( text(), text(), ) .map( "/".join, ) .filter( lambda content_type: content_type not in blacklist, ) ) @attr.s class Request(object): """ Represent some of the parameters of an HTTP request. """ method = attr.ib() headers = attr.ib() data = attr.ib() def bad_calculate_price_requests(): """ Build Request instances describing requests which are not allowed at the ``/calculate-price`` endpoint. """ good_methods = just(b"POST") bad_methods = sampled_from( [ b"GET", b"HEAD", b"PUT", b"PATCH", b"OPTIONS", b"FOO", ] ) good_headers = just({b"content-type": [b"application/json"]}) bad_headers = fixed_dictionaries( { b"content-type": mime_types(blacklist={b"application/json"},).map( lambda content_type: [content_type.encode("utf-8")], ), } ) good_version = just(1) bad_version = one_of( text(), lists(integers()), integers(max_value=0), integers(min_value=2), ) good_sizes = lists(integers(min_value=0)) bad_sizes = one_of( integers(), text(), lists(text(), min_size=1), dictionaries(text(), text()), lists(integers(max_value=-1), min_size=1), ) good_data = fixed_dictionaries( { "version": good_version, "sizes": good_sizes, } ).map(dumps_utf8) bad_data_version = fixed_dictionaries( { "version": bad_version, "sizes": good_sizes, } ).map(dumps_utf8) bad_data_sizes = fixed_dictionaries( { "version": good_version, "sizes": bad_sizes, } ).map(dumps_utf8) bad_data_other = dictionaries( text(), integers(), ).map(dumps_utf8) bad_data_junk = binary() good_fields = { "method": good_methods, "headers": good_headers, "data": good_data, } bad_choices = [ ("method", bad_methods), ("headers", bad_headers), ("data", bad_data_version), ("data", bad_data_sizes), ("data", bad_data_other), ("data", bad_data_junk), ] def merge(fields, key, value): fields = fields.copy() fields[key] = value return fields return sampled_from(bad_choices,).flatmap( lambda bad_choice: builds(Request, **merge(good_fields, *bad_choice)), ) class CalculatePriceTests(TestCase): """ Tests relating to ``/calculate-price`` as implemented by the ``_zkapauthorizer.resource`` module. """ url = b"http://127.0.0.1/calculate-price" @given( tahoe_configs(), api_auth_tokens(), bad_calculate_price_requests(), ) def test_bad_request(self, get_config, api_auth_token, bad_request): """ When approached with: * a method other than POST * a content-type other than **application/json** * a request body which is not valid JSON * a JSON request body without version and sizes properties * a JSON request body without a version of 1 * a JSON request body with other properties * or a JSON request body with sizes other than a list of integers response code is not in the 200 range. """ config = get_config_with_api_token( self.useFixture(TempDir()), get_config, api_auth_token, ) root = root_from_config(config, datetime.now) agent = RequestTraversalAgent(root) self.assertThat( authorized_request( api_auth_token, agent, bad_request.method, self.url, headers=bad_request.headers, data=BytesIO(bad_request.data), ), succeeded( matches_response( code_matcher=MatchesAny( # It is fine to signal client errors between(400, 499), # It is fine to say we didn't implement the request # method (I guess - Twisted Web sort of forces it on # us, I'd rather have NOT ALLOWED for this case # instead...). We don't want INTERNAL SERVER ERROR # though. Equals(NOT_IMPLEMENTED), ), ), ), ) @given( tuples( # Make the share encoding parameters easily accessible without # going through the Tahoe-LAFS configuration. share_parameters(), # Same goes for the minimum lease time remaining configuration. posix_timestamps().map(int), ).flatmap( lambda share_and_lease_time: tuples( just(share_and_lease_time), direct_tahoe_configs( zkapauthz_v2_configuration=client_dummyredeemer_configurations( min_times_remaining=just(share_and_lease_time[1]), ), shares=just(share_and_lease_time[0]), ), ), ), api_auth_tokens(), lists(integers(min_value=0)), ) def test_calculated_price(self, encoding_params_and_config, api_auth_token, sizes): """ A well-formed request returns the price in ZKAPs as an integer and the storage period (the minimum allowed) that they pay for. """ (encoding_params, min_time_remaining), config = encoding_params_and_config shares_needed, shares_happy, shares_total = encoding_params add_api_token_to_config( self.useFixture(TempDir()).join("tahoe"), config, api_auth_token, ) root = root_from_config(config, datetime.now) agent = RequestTraversalAgent(root) expected_price = PriceCalculator( shares_needed=shares_needed, shares_total=shares_total, pass_value=get_configured_pass_value(config), ).calculate(sizes) self.assertThat( authorized_request( api_auth_token, agent, b"POST", self.url, headers={b"content-type": [b"application/json"]}, data=BytesIO(dumps_utf8({"version": 1, "sizes": sizes})), ), succeeded( matches_response( code_matcher=Equals(OK), headers_matcher=application_json(), body_matcher=AfterPreprocessing( loads, Equals( { "price": expected_price, "period": 60 * 60 * 24 * 31 - min_time_remaining, } ), ), ), ), ) def application_json(): return AfterPreprocessing( lambda h: h.getRawHeaders("content-type"), Equals(["application/json"]), ) def json_content(response): reading = readBody(response) reading.addCallback(loads) return reading def ok_response(headers=None): return match_response(OK, headers, phrase=Equals(b"OK")) def not_found_response(headers=None): return match_response(NOT_FOUND, headers) def bad_request_response(headers=None): return match_response(BAD_REQUEST, headers) def match_response(code, headers, phrase=Always()): if headers is None: headers = Always() return _MatchResponse( code=Equals(code), headers=headers, phrase=phrase, ) @attr.s class _MatchResponse(object): code = attr.ib() headers = attr.ib() phrase = attr.ib() _details = attr.ib(default=attr.Factory(dict)) def match(self, response): self._details.update( { "code": response.code, "headers": response.headers.getAllRawHeaders(), } ) return MatchesStructure( code=self.code, headers=self.headers, phrase=self.phrase, ).match(response) def get_details(self): return self._details