Newer
Older
# Copyright 2019 PrivateStorage.io, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Hypothesis strategies for property testing.
"""
from base64 import (
urlsafe_b64encode,
)
from datetime import (
datetime,
)
from zope.interface import (
implementer,
)
binary,
characters,
text,
integers,
sets,
fixed_dictionaries,
from twisted.internet.defer import (
succeed,
)
from twisted.internet.task import (
Clock,
)
from twisted.web.test.requesthelper import (
DummyRequest,
)
from allmydata.interfaces import (
IFilesystemNode,
IDirectoryNode,
from allmydata.client import (
config_from_string,
)
from ..model import (
Pass,
UnblindedToken,
Voucher,
Pending,
DoubleSpend,
Redeemed,
# Sizes informed by
# https://github.com/brave-intl/challenge-bypass-ristretto/blob/2f98b057d7f353c12b2b12d0f5ae9ad115f1d0ba/src/oprf.rs#L18-L33
# The length of a `Token`, in bytes.
_TOKEN_LENGTH = 96
# The length of a `UnblindedToken`, in bytes.
_UNBLINDED_TOKEN_LENGTH = 96
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def _merge_dictionaries(dictionaries):
result = {}
for d in dictionaries:
result.update(d)
return result
def _tahoe_config_quote(text):
return text.replace(u"%", u"%%")
def _config_string_from_sections(divided_sections):
sections = _merge_dictionaries(divided_sections)
return u"".join(list(
u"[{name}]\n{items}\n".format(
name=name,
items=u"\n".join(
u"{key} = {value}".format(key=key, value=_tahoe_config_quote(value))
for (key, value)
in contents.items()
)
)
for (name, contents) in sections.items()
))
def tahoe_config_texts(storage_client_plugins):
"""
Build the text of complete Tahoe-LAFS configurations for a node.
"""
return builds(
lambda *sections: _config_string_from_sections(
sections,
),
fixed_dictionaries(
{
"storageclient.plugins.{}".format(name): configs
for (name, configs)
in storage_client_plugins.items()
},
),
fixed_dictionaries(
{
"node": fixed_dictionaries(
{
"nickname": node_nicknames(),
},
),
"client": fixed_dictionaries(
{
"storage.plugins": just(
u",".join(storage_client_plugins.keys()),
),
},
),
},
),
)
def minimal_tahoe_configs(storage_client_plugins=None):
"""
Build complete Tahoe-LAFS configurations for a node.
"""
if storage_client_plugins is None:
storage_client_plugins = {}
return tahoe_config_texts(
storage_client_plugins,
).map(
lambda config_text: lambda basedir, portnumfile: config_from_string(
basedir,
portnumfile,
config_text.encode("utf-8"),
),
)
def node_nicknames():
"""
Builds Tahoe-LAFS node nicknames.
"""
return text(
min_size=0,
max_size=16,
alphabet=characters(
blacklist_categories={
# Surrogates
u"Cs",
# Unnamed and control characters
u"Cc",
},
),
)
def server_configurations(signing_key_path):
Build configuration values for the server-side plugin.
:param unicode signing_key_path: A value to insert for the
**ristretto-signing-key-path** item.
return just({
u"ristretto-issuer-root-url": u"https://issuer.example.invalid/",
u"ristretto-signing-key-path": signing_key_path.path,
})
def client_ristrettoredeemer_configurations():
Build Ristretto-using configuration values for the client-side plugin.
return just({
u"ristretto-issuer-root-url": u"https://issuer.example.invalid/",
u"redeemer": u"ristretto",
})
def client_dummyredeemer_configurations():
"""
Build DummyRedeemer-using configuration values for the client-side plugin.
"""
return just({
u"redeemer": u"dummy",
})
def client_doublespendredeemer_configurations():
"""
Build DoubleSpendRedeemer-using configuration values for the client-side plugin.
"""
return just({
u"redeemer": u"double-spend",
})
def client_unpaidredeemer_configurations():
"""
Build UnpaidRedeemer-using configuration values for the client-side plugin.
"""
return just({
u"redeemer": u"unpaid",
})
def client_nonredeemer_configurations():
"""
Build NonRedeemer-using configuration values for the client-side plugin.
"""
return just({
u"redeemer": u"non",
})
def client_errorredeemer_configurations(details):
"""
Build ErrorRedeemer-using configuration values for the client-side plugin.
"""
return just({
u"redeemer": u"error",
u"details": details,
})
def tahoe_configs(zkapauthz_v1_configuration=client_dummyredeemer_configurations()):
"""
Build complete Tahoe-LAFS configurations including the zkapauthorizer
client plugin section.
"""
return minimal_tahoe_configs({
u"privatestorageio-zkapauthz-v1": zkapauthz_v1_configuration,
def vouchers():
Build unicode strings in the format of vouchers.
"""
return binary(
min_size=32,
max_size=32,
).map(
urlsafe_b64encode,
lambda voucher: voucher.decode("ascii"),
def voucher_states():
"""
Build unicode strings giving states a Voucher can be in.
"""
return one_of(
just(Pending()),
builds(
DoubleSpend,
finished=datetimes(),
),
builds(
Redeemed,
finished=datetimes(),
token_count=one_of(integers(min_value=1)),
),
def voucher_objects():
return builds(
Voucher,
number=vouchers(),
created=one_of(none(), datetimes()),
def byte_strings(label, length, entropy):
Build byte strings of the given length with at most the given amount of
entropy.
These are cheaper for Hypothesis to construct than byte strings where
potentially the entire length is random.
if len(label) + entropy > length:
raise ValueError("Entropy and label don't fit into {} bytes".format(
length,
))
min_size=entropy,
max_size=entropy,
lambda bs: label + b"x" * (length - entropy - len(label)) + bs,
)
def random_tokens():
"""
Build ``RandomToken`` instances.
"""
return byte_strings(
label=b"random-tokens",
length=_TOKEN_LENGTH,
entropy=4,
).map(
b64encode,
).map(
lambda token: RandomToken(token.decode("ascii")),
)
def zkaps():
"""
Build random ZKAPs as ``Pass` instances.
"""
return builds(
lambda preimage, signature: Pass(u"{} {}".format(preimage, signature)),
# Sizes informed by
# https://github.com/brave-intl/challenge-bypass-ristretto/blob/2f98b057d7f353c12b2b12d0f5ae9ad115f1d0ba/src/oprf.rs#L18-L33
preimage=binary(min_size=64, max_size=64).map(urlsafe_b64encode),
signature=binary(min_size=64, max_size=64).map(urlsafe_b64encode),
def unblinded_tokens():
"""
Builds random ``_zkapauthorizer.model.UnblindedToken`` wrapping invalid
base64 encode data. You cannot use these in the PrivacyPass cryptographic
protocol but you can put them into the database and take them out again.
"""
return byte_strings(
label=b"unblinded-tokens",
length=_UNBLINDED_TOKEN_LENGTH,
entropy=4,
).map(
lambda zkap: UnblindedToken(zkap.decode("ascii")),
)
def request_paths():
"""
Build lists of unicode strings that represent the path component of an
HTTP request.
:see: ``requests``
"""
def requests(paths=request_paths()):
"""
Build objects providing ``twisted.web.iweb.IRequest``.
"""
return builds(
DummyRequest,
paths,
)
def storage_indexes():
"""
Build Tahoe-LAFS storage indexes.
"""
return binary(
# It is tempting to use StorageIndex.minLength and
# StorageIndex.maxLength but these are effectively garbage. See the
# implementation of ByteStringConstraint for details.
min_size=16,
max_size=16,
)
def lease_renew_secrets():
"""
Build Tahoe-LAFS lease renewal secrets.
"""
return binary(
min_size=HASH_SIZE,
max_size=HASH_SIZE,
)
def lease_cancel_secrets():
"""
Build Tahoe-LAFS lease cancellation secrets.
"""
return binary(
min_size=HASH_SIZE,
max_size=HASH_SIZE,
)
def write_enabler_secrets():
"""
Build Tahoe-LAFS write enabler secrets.
"""
return binary(
min_size=HASH_SIZE,
max_size=HASH_SIZE,
def sharenums():
"""
Build Tahoe-LAFS share numbers.
"""
return integers(
min_value=0,
max_value=255,
)
def sharenum_sets():
"""
Build sets of Tahoe-LAFS share numbers.
"""
return sets(
sharenums(),
min_size=1,
)
def sizes():
"""
Build Tahoe-LAFS share sizes.
"""
return integers(
# Size 0 data isn't data, it's nothing.
min_value=1,
# For the moment there are some assumptions in the test suite that
# limit us to an amount of storage that can be paid for with one ZKAP.
# That will be fixed eventually. For now, keep the sizes pretty low.
max_value=2 ** 16,
)
def offsets():
"""
Build Tahoe-LAFS share offsets.
"""
return integers(
min_value=0,
# Just for practical purposes...
max_value=2 ** 16,
def bytes_for_share(sharenum, size):
"""
:return bytes: marginally distinctive bytes of a certain length for the
given share number
"""
if 0 <= sharenum <= 255:
return (unichr(sharenum) * size).encode("latin-1")
raise ValueError("Sharenum must be between 0 and 255 inclusive.")
def shares():
"""
Build Tahoe-LAFS share data.
"""
return tuples(
sharenums(),
sizes()
).map(
lambda num_and_size: bytes_for_share(*num_and_size),
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
)
def data_vectors():
"""
Build Tahoe-LAFS data vectors.
"""
return lists(
tuples(
offsets(),
shares(),
),
# An empty data vector doesn't make much sense. If you have no data
# to write, you should probably use slot_readv instead. Also,
# Tahoe-LAFS explodes if you pass an empty data vector -
# storage/server.py, OSError(ENOENT) from `os.listdir(bucketdir)`.
min_size=1,
# Just for practical purposes...
max_size=8,
)
def test_vectors():
"""
Build Tahoe-LAFS test vectors.
"""
return lists(
# XXX TODO
just(None),
min_size=0,
max_size=0,
)
@attr.s(frozen=True)
class TestAndWriteVectors(object):
"""
Provide an alternate structure for the values required by the
``tw_vectors`` parameter accepted by
``RIStorageServer.slot_testv_and_readv_and_writev``.
"""
test_vector = attr.ib()
write_vector = attr.ib()
new_length = attr.ib()
def for_call(self):
"""
Construct a value suitable to be passed as ``tw_vectors`` to
``slot_testv_and_readv_and_writev``.
"""
return (self.test_vector, self.write_vector, self.new_length)
def test_and_write_vectors():
"""
Build Tahoe-LAFS test and write vectors for a single share.
"""
return builds(
TestAndWriteVectors,
test_vectors(),
data_vectors(),
one_of(
just(None),
sizes(),
),
)
def test_and_write_vectors_for_shares():
"""
Build Tahoe-LAFS test and write vectors for a number of shares.
"""
return dictionaries(
sharenums(),
test_and_write_vectors(),
# An empty dictionary wouldn't make much sense. And it provokes a
# NameError from Tahoe, storage/server.py:479, `new_length` referenced
# before assignment.
min_size=1,
# Just for practical purposes...
def announcements():
"""
Build announcements for the ZKAPAuthorizer plugin.
return just({
u"ristretto-issuer-root-url": u"https://issuer.example.invalid/",
})
_POSIX_EPOCH = datetime.utcfromtimestamp(0)
def posix_safe_datetimes():
"""
Build datetime instances in a range that can be represented as floats
without losing microsecond precision.
"""
return datetimes(
# I don't know that time-based parts of the system break down
# before the POSIX epoch but I don't know that they work, either.
# Don't time travel with this code.
min_value=_POSIX_EPOCH,
# Once we get far enough into the future we lose the ability to
# represent a timestamp with microsecond precision in a floating point
# number, which we do with any POSIX timestamp-like API (eg
# twisted.internet.task.Clock). So don't go far enough into the
# future. Furthermore, once we don't fit into an unsigned 4 byte
# integers, we can't round-trip through all the things that expect a
# time_t. Stay back from the absolute top to give tests a little
# space to advance time, too.
max_value=datetime.utcfromtimestamp(2 ** 31),
)
def clocks(now=posix_safe_datetimes()):
"""
Build ``twisted.internet.task.Clock`` instances set to a time built by
``now``.
"""
def clock_at_time(when):
c = Clock()
c.advance((when - _POSIX_EPOCH).total_seconds())
return c
return now.map(clock_at_time)
@implementer(IFilesystemNode)
class _LeafNode(object):
_storage_index = attr.ib()
def get_storage_index(self):
return self._storage_index
# For testing
def flatten(self):
return [self]
def leaf_nodes():
return storage_indexes().map(_LeafNode)
@implementer(IDirectoryNode)
@attr.s
class _DirectoryNode(object):
_storage_index = attr.ib()
_children = attr.ib()
def list(self):
return succeed(self._children)
def get_storage_index(self):
return self._storage_index
# For testing
def flatten(self):
result = [self]
for (node, _) in self._children.values():
result.extend(node.flatten())
return result
def directory_nodes(child_strategy):
"""
Build directory nodes with children drawn from the given strategy.
"""
children = dictionaries(
text(),
tuples(
child_strategy,
just({}),
),
)
return builds(
_DirectoryNode,
storage_indexes(),
children,
)
def node_hierarchies():
"""
Build hierarchies of ``IDirectoryNode`` and other ``IFilesystemNode``
(incomplete) providers.
"""
def storage_indexes_are_distinct(nodes):
seen = set()
for n in nodes.flatten():
si = n.get_storage_index()
if si in seen:
return False
seen.add(si)
return True
return recursive(
leaf_nodes(),
directory_nodes,
).filter(
storage_indexes_are_distinct,