Newer
Older
# Copyright 2019 PrivateStorage.io, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Tests for ``_zkapauthorizer.lease_maintenance``.
"""
from __future__ import (
absolute_import,
unicode_literals,
)
from datetime import (
from zope.interface import (
implementer,
)
Always,
HasLength,
MatchesAll,
AfterPreprocessing,
from testtools.twistedsupport import (
from fixtures import (
TempDir,
)
from hypothesis import (
given,
from hypothesis.strategies import (
builds,
binary,
integers,
lists,
composite,
just,
from twisted.python.filepath import (
FilePath,
)
from twisted.internet.task import (
Clock,
)
from twisted.internet.defer import (
succeed,
)
from twisted.application.service import (
IService,
)
from allmydata.util.hashutil import (
CRYPTO_VAL_SIZE,
)
from allmydata.client import (
SecretHolder,
)
from allmydata.interfaces import (
IStorageBroker,
IServer,
)
from ..foolscap import (
ShareStat,
)
from .matchers import (
Provides,
Jean-Paul Calderone
committed
leases_current,
)
from .strategies import (
storage_indexes,
node_hierarchies,
from ..lease_maintenance import (
NoopMaintenanceObserver,
MemoryMaintenanceObserver,
maintain_leases_from_root,
visit_storage_indexes_from_root,
def interval_means():
return floats(
# It doesn't make sense to have a negative check interval mean.
min_value=0,
# We can't make this value too large or it isn't convertable to a
# timedelta. Also, even values as large as this one are of
# questionable value.
max_value=60 * 60 * 24 * 365,
).map(
# By representing the result as a timedelta we avoid the cases where
# the lower precision of timedelta compared to float drops the whole
# value (anything between 0 and 1 microsecond). This is just one
# example of how working with timedeltas is nicer, in general.
lambda s: timedelta(seconds=s),
)
def dummy_maintain_leases():
pass
@attr.s
class DummyStorageServer(object):
"""
A dummy implementation of ``IStorageServer`` from Tahoe-LAFS.
:ivar dict[bytes, ShareStat] buckets: A mapping from storage index to
metadata about shares at that storage index.
"""
clock = attr.ib()
buckets = attr.ib()
lease_seed = attr.ib()
def stat_shares(self, storage_indexes):
return succeed(list(
{0: self.buckets[idx]} if idx in self.buckets else {}
for idx
in storage_indexes
))
def get_lease_seed(self):
return self.lease_seed
def renew_lease(self, storage_index, renew_secret):
self.buckets[storage_index].lease_expiration = (
self.clock.seconds() + timedelta(days=31).total_seconds()
)
def lease_seeds():
return binary(
min_size=20,
max_size=20,
def share_stats():
return builds(
ShareStat,
size=integers(min_value=0),
lease_expiration=integers(min_value=0, max_value=2 ** 31),
)
def storage_servers(clocks):
return builds(
DummyStorageServer,
clocks,
dictionaries(storage_indexes(), share_stats()),
lease_seeds(),
).map(
DummyServer,
@implementer(IServer)
@attr.s
class DummyServer(object):
"""
A partial implementation of a Tahoe-LAFS "native" storage server.
"""
_storage_server = attr.ib()
def get_storage_server(self):
return self._storage_server
@implementer(IStorageBroker)
@attr.s
class DummyStorageBroker(object):
"""
A partial implementation of a Tahoe-LAFS storage broker.
"""
clock = attr.ib()
_storage_servers = attr.ib()
def get_connected_servers(self):
return self._storage_servers
@composite
def storage_brokers(draw, clocks):
clock = draw(clocks)
return DummyStorageBroker(
clock,
draw(lists(storage_servers(just(clock)))),
)
class LeaseMaintenanceServiceTests(TestCase):
"""
Tests for the service returned by ``lease_maintenance_service``.
"""
@given(randoms())
def test_interface(self, random):
"""
The service provides ``IService``.
"""
clock = Clock()
service = lease_maintenance_service(
FilePath(self.useFixture(TempDir()).join(u"last-run")),
random,
)
self.assertThat(
service,
Provides([IService]),
)
@given(
randoms(),
interval_means(),
)
def test_initial_interval(self, random, mean):
"""
When constructed without a value for ``last_run``,
``lease_maintenance_service`` schedules its first run to take place
after an interval that falls uniformly in range centered on ``mean``
with a size of ``range``.
"""
clock = Clock()
# Construct a range that fits in with the mean
range_ = timedelta(
seconds=random.uniform(0, mean.total_seconds()),
)
service = lease_maintenance_service(
FilePath(self.useFixture(TempDir()).join(u"last-run")),
mean,
range_,
)
service.startService()
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
[maintenance_call] = clock.getDelayedCalls()
datetime_now = datetime.utcfromtimestamp(clock.seconds())
low = datetime_now + mean - (range_ / 2)
high = datetime_now + mean + (range_ / 2)
self.assertThat(
datetime.utcfromtimestamp(maintenance_call.getTime()),
between(low, high),
)
@given(
randoms(),
clocks(),
interval_means(),
interval_means(),
)
def test_initial_interval_with_last_run(self, random, clock, mean, since_last_run):
"""
When constructed with a value for ``last_run``,
``lease_maintenance_service`` schedules its first run to take place
sooner than it otherwise would, by at most the time since the last
run.
"""
datetime_now = datetime.utcfromtimestamp(clock.seconds())
# Construct a range that fits in with the mean
range_ = timedelta(
seconds=random.uniform(0, mean.total_seconds()),
)
# Figure out the absolute last run time.
last_run = datetime_now - since_last_run
last_run_path = FilePath(self.useFixture(TempDir()).join(u"last-run"))
last_run_path.setContent(last_run.isoformat())
service = lease_maintenance_service(
random,
mean,
range_,
)
service.startService()
[maintenance_call] = clock.getDelayedCalls()
low = datetime_now + max(
timedelta(0),
mean - (range_ / 2) - since_last_run,
)
high = max(
# If since_last_run is one microsecond (precision of timedelta)
# then the range is indivisible. Avoid putting the expected high
# below the expected low.
low,
datetime_now + mean + (range_ / 2) - since_last_run,
)
note("mean: {}\nrange: {}\nnow: {}\nlow: {}\nhigh: {}\nsince last: {}".format(
mean, range_, datetime_now, low, high, since_last_run,
))
self.assertThat(
datetime.utcfromtimestamp(maintenance_call.getTime()),
between(low, high),
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
@given(
randoms(),
clocks(),
)
def test_clean_up_when_stopped(self, random, clock):
"""
When the service is stopped, the delayed call in the reactor is removed.
"""
service = lease_maintenance_service(
lambda: None,
clock,
FilePath(self.useFixture(TempDir()).join(u"last-run")),
random,
)
service.startService()
self.assertThat(
maybeDeferred(service.stopService),
succeeded(Is(None)),
)
self.assertThat(
clock.getDelayedCalls(),
Equals([]),
)
self.assertThat(
service.running,
Equals(False),
)
@given(
randoms(),
clocks(),
)
def test_nodes_visited(self, random, clock):
"""
When the service runs, it calls the ``maintain_leases`` object.
"""
leases_maintained_at = []
def maintain_leases():
leases_maintained_at.append(datetime.utcfromtimestamp(clock.seconds()))
service = lease_maintenance_service(
maintain_leases,
clock,
FilePath(self.useFixture(TempDir()).join(u"last-run")),
random,
)
service.startService()
[maintenance_call] = clock.getDelayedCalls()
clock.advance(maintenance_call.getTime() - clock.seconds())
self.assertThat(
leases_maintained_at,
Equals([datetime.utcfromtimestamp(clock.seconds())]),
)
class VisitStorageIndexesFromRootTests(TestCase):
Tests for ``visit_storage_indexes_from_root``.
"""
@given(node_hierarchies(), clocks())
def test_visits_all_nodes(self, root_node, clock):
"""
The operation calls the specified visitor with every node from the root to
its deepest children.
"""
visited = []
def perform_visit(visit_assets):
return visit_assets(visited.append)
operation = visit_storage_indexes_from_root(
perform_visit,
)
self.assertThat(
operation(),
succeeded(Always()),
)
expected = root_node.flatten()
self.assertThat(
visited,
MatchesAll(
HasLength(len(expected)),
AfterPreprocessing(
set,
Equals(set(
node.get_storage_index()
for node
in expected
)),
),
),
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
class RenewLeasesTests(TestCase):
"""
Tests for ``renew_leases``.
"""
@given(storage_brokers(clocks()), lists(leaf_nodes()))
def test_renewed(self, storage_broker, nodes):
"""
``renew_leases`` renews the leases of shares on all storage servers which
have no more than the specified amount of time remaining on their
current lease.
"""
lease_secret = b"\0" * CRYPTO_VAL_SIZE
convergence_secret = b"\1" * CRYPTO_VAL_SIZE
secret_holder = SecretHolder(lease_secret, convergence_secret)
min_lease_remaining = timedelta(days=3)
def get_now():
return datetime.utcfromtimestamp(
storage_broker.clock.seconds(),
)
def visit_assets(visit):
for node in nodes:
visit(node.get_storage_index())
return succeed(None)
d = renew_leases(
visit_assets,
storage_broker,
secret_holder,
min_lease_remaining,
NoopMaintenanceObserver,
get_now,
)
self.assertThat(
d,
succeeded(Always()),
)
relevant_storage_indexes = set(
node.get_storage_index()
for node
in nodes
)
self.assertThat(
list(
server.get_storage_server()
for server
in storage_broker.get_connected_servers()
),
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
AllMatch(leases_current(
relevant_storage_indexes,
get_now(),
min_lease_remaining,
)),
)
class MaintainLeasesFromRootTests(TestCase):
"""
Tests for ``maintain_leases_from_root``.
"""
@given(storage_brokers(clocks()), node_hierarchies())
def test_renewed(self, storage_broker, root_node):
"""
``maintain_leases_from_root`` creates an operation which renews the leases
of shares on all storage servers which have no more than the specified
amount of time remaining on their current lease.
"""
lease_secret = b"\0" * CRYPTO_VAL_SIZE
convergence_secret = b"\1" * CRYPTO_VAL_SIZE
secret_holder = SecretHolder(lease_secret, convergence_secret)
min_lease_remaining = timedelta(days=3)
def get_now():
return datetime.utcfromtimestamp(
storage_broker.clock.seconds(),
)
operation = maintain_leases_from_root(
storage_broker,
secret_holder,
min_lease_remaining,
NoopMaintenanceObserver,
get_now,
)
d = operation()
self.assertThat(
d,
succeeded(Always()),
)
relevant_storage_indexes = set(
node.get_storage_index()
for node
in root_node.flatten()
)
self.assertThat(
list(
server.get_storage_server()
for server
in storage_broker.get_connected_servers()
),
AllMatch(leases_current(
relevant_storage_indexes,
get_now(),
min_lease_remaining,
))
@given(storage_brokers(clocks()), node_hierarchies())
def test_activity_observed(self, storage_broker, root_node):
"""
``maintain_leases_from_root`` creates an operation which uses the given
activity observer to report its progress.
"""
lease_secret = b"\0" * CRYPTO_VAL_SIZE
convergence_secret = b"\1" * CRYPTO_VAL_SIZE
secret_holder = SecretHolder(lease_secret, convergence_secret)
min_lease_remaining = timedelta(days=3)
def get_now():
return datetime.utcfromtimestamp(
storage_broker.clock.seconds(),
)
observer = MemoryMaintenanceObserver()
# There is only one available.
observers = [observer]
progress = observers.pop
operation = maintain_leases_from_root(
storage_broker,
secret_holder,
min_lease_remaining,
progress,
get_now,
)
d = operation()
self.assertThat(
d,
succeeded(Always()),
)
expected = []
for node in root_node.flatten():
for server in storage_broker.get_connected_servers():
stat = server.get_storage_server().buckets[node.get_storage_index()]
except KeyError:
continue
else:
# DummyStorageServer always pretends to have only one share
expected.append([stat.size])
# The visit order doesn't matter.
expected.sort()
self.assertThat(
observer.observed,
AfterPreprocessing(
sorted,
Equals(expected),
),