Skip to content
Snippets Groups Projects
test_controller.py 16.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • # Copyright 2019 PrivateStorage.io, LLC
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """
    Tests for ``_zkapauthorizer.controller``.
    """
    
    
    from __future__ import (
        absolute_import,
    )
    
    
    from json import (
        loads,
        dumps,
    )
    
    from functools import (
        partial,
    )
    
    from zope.interface import (
        implementer,
    )
    
    from testtools import (
        TestCase,
    )
    
    from testtools.content import (
        text_content,
    )
    
    from testtools.matchers import (
        Equals,
    
        MatchesAll,
        AllMatch,
        IsInstance,
        HasLength,
    
    )
    from testtools.twistedsupport import (
        succeeded,
    
    )
    
    from hypothesis import (
        given,
    )
    
    from hypothesis.strategies import (
        integers,
    
    from twisted.python.url import (
        URL,
    )
    
    from twisted.internet.defer import (
        fail,
    )
    from twisted.web.iweb import (
        IAgent,
    )
    from twisted.web.resource import (
    
    from twisted.web.http import (
        UNSUPPORTED_MEDIA_TYPE,
    
    from treq.testing import (
    
    
    from privacypass import (
        SecurityException,
        PublicKey,
        BlindedToken,
        BatchDLEQProof,
    
        TokenPreimage,
        VerificationSignature,
    
    from ..controller import (
    
        NonRedeemer,
        DummyRedeemer,
    
        DoubleSpendRedeemer,
    
        RistrettoRedeemer,
    
        PaymentController,
    
        Pending as model_Pending,
        DoubleSpend as model_DoubleSpend,
        Redeemed as model_Redeemed,
    
    )
    
    from .strategies import (
        tahoe_configs,
        vouchers,
    
    from .matchers import (
        Provides,
    )
    
    from .fixtures import (
        TemporaryVoucherStore,
    )
    
    
    
    class PaymentControllerTests(TestCase):
        """
        Tests for ``PaymentController``.
        """
    
        @given(tahoe_configs(), datetimes(), vouchers())
        def test_not_redeemed_while_redeeming(self, get_config, now, voucher):
    
            """
            A ``Voucher`` is not marked redeemed before ``IRedeemer.redeem``
            completes.
            """
    
            store = self.useFixture(TemporaryVoucherStore(get_config, lambda: now)).store
    
            controller = PaymentController(
                store,
                NonRedeemer(),
            )
            controller.redeem(voucher)
    
            persisted_voucher = store.get(voucher)
            self.assertThat(
    
                persisted_voucher.state,
    
                Equals(model_Pending()),
    
        @given(tahoe_configs(), datetimes(), vouchers())
        def test_redeemed_after_redeeming(self, get_config, now, voucher):
    
            """
            A ``Voucher`` is marked as redeemed after ``IRedeemer.redeem`` succeeds.
            """
    
            store = self.useFixture(TemporaryVoucherStore(get_config, lambda: now)).store
    
            controller = PaymentController(
                store,
                DummyRedeemer(),
            )
            controller.redeem(voucher)
    
            persisted_voucher = store.get(voucher)
            self.assertThat(
    
                persisted_voucher.state,
    
                Equals(model_Redeemed(
    
                    finished=now,
                    token_count=100,
                )),
    
        @given(tahoe_configs(), datetimes(), vouchers())
        def test_double_spent_after_double_spend(self, get_config, now, voucher):
            """
            A ``Voucher`` is marked as double-spent after ``IRedeemer.redeem`` fails
            with ``AlreadySpent``.
            """
    
            store = self.useFixture(TemporaryVoucherStore(get_config, lambda: now)).store
    
            controller = PaymentController(
                store,
    
                DoubleSpendRedeemer(),
    
            )
            controller.redeem(voucher)
    
            persisted_voucher = store.get(voucher)
            self.assertThat(
                persisted_voucher,
                MatchesStructure(
    
                    state=Equals(model_DoubleSpend(
    
        @given(tahoe_configs(), datetimes(), vouchers())
        def test_redeem_pending_on_startup(self, get_config, now, voucher):
            """
            When ``PaymentController`` is created, any vouchers in the store in the
            pending state are redeemed.
            """
    
            store = self.useFixture(TemporaryVoucherStore(get_config, lambda: now)).store
    
            # Create the voucher state in the store with a redemption that will
            # certainly fail.
            unpaid_controller = PaymentController(
                store,
                UnpaidRedeemer(),
            )
            unpaid_controller.redeem(voucher)
    
            # Make sure we got where we wanted.
            self.assertThat(
                unpaid_controller.get_voucher(voucher).state,
                IsInstance(model_Unpaid),
            )
    
            # Create another controller with the same store.  It will see the
            # voucher state and attempt a redemption on its own.  It has I/O as an
            # `__init__` side-effect. :/
            success_controller = PaymentController(
                store,
                DummyRedeemer(),
            )
    
            self.assertThat(
                success_controller.get_voucher(voucher).state,
                IsInstance(model_Redeemed),
            )
    
    
    NOWHERE = URL.from_text(u"https://127.0.0.1/")
    
    
    class RistrettoRedeemerTests(TestCase):
        """
        Tests for ``RistrettoRedeemer``.
        """
        def test_interface(self):
            """
            An ``RistrettoRedeemer`` instance provides ``IRedeemer``.
            """
    
            redeemer = RistrettoRedeemer(stub_agent(), NOWHERE)
    
            self.assertThat(
                redeemer,
                Provides([IRedeemer]),
            )
    
    
        @given(voucher_objects(), integers(min_value=1, max_value=100))
    
        def test_good_ristretto_redemption(self, voucher, num_tokens):
    
            If the issuer returns a successful result then
            ``RistrettoRedeemer.redeem`` returns a ``Deferred`` that fires with a
    
            list of ``UnblindedToken`` instances.
    
            signing_key = random_signing_key()
            issuer = RistrettoRedemption(signing_key)
            treq = treq_for_loopback_ristretto(issuer)
            redeemer = RistrettoRedeemer(treq, NOWHERE)
    
            random_tokens = redeemer.random_tokens_for_voucher(voucher, num_tokens)
            d = redeemer.redeem(
                voucher,
                random_tokens,
            )
            self.assertThat(
                d,
                succeeded(
                    MatchesAll(
                        AllMatch(
    
                            IsInstance(UnblindedToken),
    
        @given(voucher_objects(), integers(min_value=1, max_value=100))
        def test_redemption_denied_alreadyspent(self, voucher, num_tokens):
            """
            If the issuer declines to allow the voucher to be redeemed and gives a
            reason that the voucher has already been spent, ``RistrettoRedeem``
            returns a ``Deferred`` that fires with a ``Failure`` wrapping
            ``AlreadySpent``.
            """
            issuer = AlreadySpentRedemption()
            treq = treq_for_loopback_ristretto(issuer)
            redeemer = RistrettoRedeemer(treq, NOWHERE)
            random_tokens = redeemer.random_tokens_for_voucher(voucher, num_tokens)
            d = redeemer.redeem(
                voucher,
                random_tokens,
            )
            self.assertThat(
                d,
                failed(
                    AfterPreprocessing(
                        lambda f: f.value,
                        IsInstance(AlreadySpent),
                    ),
                ),
            )
    
    
        @given(voucher_objects(), integers(min_value=1, max_value=100))
        def test_redemption_denied_unpaid(self, voucher, num_tokens):
            """
            If the issuer declines to allow the voucher to be redeemed and gives a
            reason that the voucher has not been paid for, ``RistrettoRedeem``
            returns a ``Deferred`` that fires with a ``Failure`` wrapping
            ``Unpaid``.
            """
            issuer = UnpaidRedemption()
            treq = treq_for_loopback_ristretto(issuer)
            redeemer = RistrettoRedeemer(treq, NOWHERE)
            random_tokens = redeemer.random_tokens_for_voucher(voucher, num_tokens)
            d = redeemer.redeem(
                voucher,
                random_tokens,
            )
            self.assertThat(
                d,
                failed(
                    AfterPreprocessing(
                        lambda f: f.value,
                        IsInstance(Unpaid),
                    ),
                ),
            )
    
    
        @given(voucher_objects(), integers(min_value=1, max_value=100))
    
        def test_bad_ristretto_redemption(self, voucher, num_tokens):
            """
    
            If the issuer returns a successful result with an invalid proof then
    
            ``RistrettoRedeemer.redeem`` returns a ``Deferred`` that fires with a
    
            ``Failure`` wrapping ``SecurityException``.
    
            """
            signing_key = random_signing_key()
            issuer = RistrettoRedemption(signing_key)
    
    
            # Make it lie about the public key it is using.  This causes the proof
            # to be invalid since it proves the signature was made with a
            # different key than reported in the response.
    
            issuer.public_key = PublicKey.from_signing_key(random_signing_key())
    
            treq = treq_for_loopback_ristretto(issuer)
            redeemer = RistrettoRedeemer(treq, NOWHERE)
            random_tokens = redeemer.random_tokens_for_voucher(voucher, num_tokens)
            d = redeemer.redeem(
                voucher,
                random_tokens,
            )
            self.addDetail(u"redeem Deferred", text_content(str(d)))
            self.assertThat(
                d,
                failed(
                    AfterPreprocessing(
                        lambda f: f.value,
                        IsInstance(SecurityException),
                    ),
                ),
            )
    
    
        @given(voucher_objects(), integers(min_value=1, max_value=100))
    
        def test_ristretto_pass_construction(self, voucher, num_tokens):
            """
            The passes constructed using unblinded tokens and messages pass the
            Ristretto verification check.
            """
            message = b"hello world"
    
            signing_key = random_signing_key()
            issuer = RistrettoRedemption(signing_key)
            treq = treq_for_loopback_ristretto(issuer)
            redeemer = RistrettoRedeemer(treq, NOWHERE)
    
            random_tokens = redeemer.random_tokens_for_voucher(voucher, num_tokens)
            d = redeemer.redeem(
                voucher,
                random_tokens,
            )
            def unblinded_tokens_to_passes(unblinded_tokens):
                passes = redeemer.tokens_to_passes(message, unblinded_tokens)
                return passes
            d.addCallback(unblinded_tokens_to_passes)
    
            self.assertThat(
                d,
                succeeded(
                    AfterPreprocessing(
                        partial(ristretto_verify, signing_key, message),
                        Equals(True),
                    ),
                ),
            )
    
    
    def ristretto_verify(signing_key, message, marshaled_passes):
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
        """
        Verify that the given passes were generated in a process that involved a
        signature from the given signing key and using the given message.
    
        :param privacypass.SigningKey signing_key: A signing key which should have
            signed some random blinded tokens earlier in the lifecycle of the
            passes to verify.
    
        :param bytes message: Request binding data which is involved in the
            generation of the passes to verify.
    
    
        :param list[bytes] marshaled_passes: Token preimages and corresponding
            message signatures to verify.  Each element contains two
            space-separated base64 encoded values, the first representing the
            preimage and the second representing the signature.
    
    Jean-Paul Calderone's avatar
    Jean-Paul Calderone committed
    
        :return bool: ``True`` if and only if all of the passes represented by
            ``marshaled_passes`` pass the Ristretto-defined verification for an
            exchange using the given signing key and message.
        """
    
        def decode(marshaled_pass):
            t, s = marshaled_pass.split(u" ")
            return (
                TokenPreimage.decode_base64(t.encode("ascii")),
                VerificationSignature.decode_base64(s.encode("ascii")),
    
        servers_passes = list(
            decode(marshaled_pass.text)
            for marshaled_pass
    
            in marshaled_passes
        )
        servers_unblinded_tokens = list(
            signing_key.rederive_unblinded_token(token_preimage)
            for (token_preimage, sig)
            in servers_passes
        )
        servers_verification_sigs = list(
            sig
            for (token_preimage, sig)
            in servers_passes
        )
        servers_verification_keys = list(
            unblinded_token.derive_verification_key_sha512()
            for unblinded_token
            in servers_unblinded_tokens
        )
        invalid_passes = list(
            key.invalid_sha512(
                sig,
                message,
            )
            for (key, sig)
            in zip(servers_verification_keys, servers_verification_sigs)
        )
    
        return not any(invalid_passes)
    
    
    def treq_for_loopback_ristretto(local_issuer):
    
        Create a ``treq``-alike which can dispatch to a local issuer.
    
        """
        v1 = Resource()
        v1.putChild(b"redeem", local_issuer)
        root = Resource()
        root.putChild(b"v1", v1)
    
        return StubTreq(root)
    
    
    
    @implementer(IAgent)
    class _StubAgent(object):
        def request(self, method, uri, headers=None, bodyProducer=None):
            return fail(Exception("It's only a model."))
    
    
    def stub_agent():
        return _StubAgent()
    
    class AlreadySpentRedemption(Resource):
        """
        An ``AlreadySpentRedemption`` simulates the Ristretto redemption server
        but always refuses to allow vouchers to be redeemed and reports an error
        that the voucher has already been redeemed.
        """
        def render_POST(self, request):
            if request.requestHeaders.getRawHeaders(b"content-type") != ["application/json"]:
                return bad_content_type(request)
    
    
            return bad_request(request, {u"success": False, u"reason": u"double-spend"})
    
    
    class UnpaidRedemption(Resource):
        """
        An ``UnpaidRedemption`` simulates the Ristretto redemption server but
        always refuses to allow vouchers to be redeemed and reports an error that
        the voucher has not been paid for.
        """
        def render_POST(self, request):
            if request.requestHeaders.getRawHeaders(b"content-type") != ["application/json"]:
                return bad_content_type(request)
    
            return bad_request(request, {u"success": False, u"reason": u"unpaid"})
    
    class RistrettoRedemption(Resource):
        def __init__(self, signing_key):
            Resource.__init__(self)
            self.signing_key = signing_key
            self.public_key = PublicKey.from_signing_key(signing_key)
    
        def render_POST(self, request):
    
            if request.requestHeaders.getRawHeaders(b"content-type") != ["application/json"]:
                return bad_content_type(request)
    
    
            request_body = loads(request.content.read())
            marshaled_blinded_tokens = request_body[u"redeemTokens"]
            servers_blinded_tokens = list(
                BlindedToken.decode_base64(marshaled_blinded_token.encode("ascii"))
                for marshaled_blinded_token
                in marshaled_blinded_tokens
            )
            servers_signed_tokens = list(
                self.signing_key.sign(blinded_token)
                for blinded_token
                in servers_blinded_tokens
            )
            marshaled_signed_tokens = list(
                signed_token.encode_base64()
                for signed_token
                in servers_signed_tokens
            )
            servers_proof = BatchDLEQProof.create(
                self.signing_key,
                servers_blinded_tokens,
                servers_signed_tokens,
            )
            try:
                marshaled_proof = servers_proof.encode_base64()
            finally:
                servers_proof.destroy()
    
            return dumps({
                u"success": True,
                u"public-key": self.public_key.encode_base64(),
                u"signatures": marshaled_signed_tokens,
                u"proof": marshaled_proof,
            })
    
    def bad_request(request, body_object):
        request.setResponseCode(BAD_REQUEST)
        request.setHeader(b"content-type", b"application/json")
        request.write(dumps(body_object))
        return b""
    
    
    
    def bad_content_type(request):
        return ErrorPage(
            UNSUPPORTED_MEDIA_TYPE,
            b"Unsupported media type",
            b"Unsupported media type",
        ).render(request)