diff --git a/src/_zkapauthorizer/recover.py b/src/_zkapauthorizer/recover.py index e5b158085f6dc0f46f20ff2a13e698069b7c8739..9b9eb9509d1ac2576dc3d8f3e15ae0b3b397bdb5 100644 --- a/src/_zkapauthorizer/recover.py +++ b/src/_zkapauthorizer/recover.py @@ -19,7 +19,19 @@ from io import BytesIO from sqlite3 import Cursor from typing import BinaryIO, Callable, Dict, Iterator, Optional +from allmydata.node import _Config from attrs import define +from hyperlink import DecodedURL +from treq.client import HTTPClient +from twisted.python.filepath import FilePath + +from .tahoe import download + + +class SnapshotMissing(Exception): + """ + No snapshot was not found in the replica directory. + """ class AlreadyRecovering(Exception): @@ -183,3 +195,45 @@ def recover(statements: Iterator[str], cursor) -> None: """ for sql in statements: cursor.execute(sql) + + +async def tahoe_lafs_downloader( + treq: HTTPClient, + node_config: _Config, + recovery_cap: str, + set_state: SetState, +) -> Awaitable: # Awaitable[FilePath] + """ + Download replica data from the given replica directory capability into the + node's private directory. + """ + api_root = DecodedURL.from_text( + FilePath(node_config.get_config_path("node.url")) + .getContent() + .decode("ascii") + .strip() + ) + snapshot_path = FilePath(node_config.get_private_path("snapshot.sql")) + + set_state(RecoveryState(stage=RecoveryStages.downloading)) + await download(treq, snapshot_path, api_root, recovery_cap, ["snapshot.sql"]) + return snapshot_path + + +def get_tahoe_lafs_downloader( + httpclient: HTTPClient, node_config: _Config +) -> Callable[[str], Downloader]: + """ + Bind some parameters to ``tahoe_lafs_downloader`` in a convenient way. + + :return: A callable that accepts a Tahoe-LAFS capability string and + returns a downloader for that capability. + """ + + def get_downloader(cap_str): + def downloader(set_state): + return tahoe_lafs_downloader(httpclient, node_config, cap_str, set_state) + + return downloader + + return get_downloader diff --git a/src/_zkapauthorizer/tahoe.py b/src/_zkapauthorizer/tahoe.py index d8f4875226711d494cc525d743fe1b15ddd9cd36..d9271d042446b4302e9e4cd745321d0f8304b932 100644 --- a/src/_zkapauthorizer/tahoe.py +++ b/src/_zkapauthorizer/tahoe.py @@ -2,12 +2,14 @@ A library for interacting with a Tahoe-LAFS node. """ +from base64 import b32encode from collections.abc import Awaitable from functools import wraps -from typing import Callable, List +from hashlib import sha256 +from typing import Callable, Iterable, List, Optional import treq -from attrs import define +from attrs import define, field from hyperlink import DecodedURL from treq.client import HTTPClient from twisted.python.filepath import FilePath @@ -45,12 +47,42 @@ def _not_enough_servers(exc: Exception) -> bool: Match the exception that is raised when the Tahoe-LAFS client node is not connected to enough servers to satisfy the encoding configuration. """ - return isinstance( - exc, TahoeAPIError - ) and "allmydata.interfaces.NoServersError" in str(exc) + return isinstance(exc, TahoeAPIError) and ( + "allmydata.interfaces.NoServersError" in str(exc) + or "allmydata.mutable.common.NotEnoughServersError" in str(exc) + ) -@define +def _scrub_cap(cap: str) -> str: + """ + Return a new string that cannot be used to recover the input string but + can usually be distinguished from the scrubbed version of a different + input string. + """ + scrubbed = b32encode(sha256(cap.encode("ascii")).digest())[:6].lower() + return f"URI:SCRUBBED:{scrubbed}" + + +def _scrub_caps_from_url(url: DecodedURL) -> DecodedURL: + """ + Return a new URL that is like ``url`` but has all capability strings in it + replaced with distinct but unusable substitutes. + """ + # One form is like /uri/<cap> + if ( + len(url.path) > 1 + and url.path[0] == "uri" + and not url.path[1].startswith("URI:SCRUBBED:") + ): + cap = url.path[1] + new = url.replace(path=(url.path[0], _scrub_cap(cap)) + url.path[2:]) + return new + + # That is the only form we use at the moment, in fact. + return url + + +@define(frozen=True, auto_exc=False) class TahoeAPIError(Exception): """ Some error was reported from a Tahoe-LAFS HTTP API. @@ -59,6 +91,8 @@ class TahoeAPIError(Exception): :ivar body: The HTTP response body. """ + method: str + url: DecodedURL = field(converter=_scrub_caps_from_url) status: int body: str @@ -88,16 +122,21 @@ async def upload( :raise: If there is a problem uploading the data -- except for unavailability of storage servers -- then some exception is raised. """ + uri = api_root.child("uri") with inpath.open() as f: - resp = await client.put(api_root.child("uri"), f) + resp = await client.put(uri, f) content = (await treq.content(resp)).decode("utf-8") if resp.code in (200, 201): return content - raise TahoeAPIError(resp.code, content) + raise TahoeAPIError("put", uri, resp.code, content) async def download( - client: HTTPClient, outpath: FilePath, api_root: DecodedURL, cap: str + client: HTTPClient, + outpath: FilePath, + api_root: DecodedURL, + cap: str, + child_path: Optional[Iterable[str]] = None, ) -> Awaitable: # Awaitable[None] but this requires Python 3.9 """ Download the object identified by the given capability to the given path. @@ -119,11 +158,57 @@ async def download( """ outtemp = outpath.temporarySibling() - resp = await client.get(api_root.child("uri", cap).to_text()) + uri = api_root.child("uri").child(cap) + if child_path is not None: + for segment in child_path: + uri = uri.child(segment) + + resp = await client.get(uri) if resp.code == 200: with outtemp.open("w") as f: await treq.collect(resp, f.write) outtemp.moveTo(outpath) else: content = (await treq.content(resp)).decode("utf-8") - raise TahoeAPIError(resp.code, content) + raise TahoeAPIError("get", uri, resp.code, content) + + +@async_retry([_not_enough_servers]) +async def make_directory( + client: HTTPClient, + api_root: DecodedURL, +) -> Awaitable: # Awaitable[str] but this requires Python 3.9 + """ + Create a new mutable directory and return the write capability string. + """ + uri = api_root.child("uri").add("t", "mkdir") + resp = await client.post(uri) + content = (await treq.content(resp)).decode("utf-8") + if resp.code == 200: + return content + raise TahoeAPIError("post", uri, resp.code, content) + + +@async_retry([_not_enough_servers]) +async def link( + client: HTTPClient, + api_root: DecodedURL, + dir_cap: str, + entry_name: str, + entry_cap: str, +) -> Awaitable: + """ + Link an object into a directory. + + :param dir_cap: The capability string of the directory in which to create + the link. + + :param entry_cap: The capability string of the object to link in to the + directory. + """ + uri = api_root.child("uri").child(dir_cap).child(entry_name).add("t", "uri") + resp = await client.put(uri, data=entry_cap.encode("ascii")) + content = (await treq.content(resp)).decode("utf-8") + if resp.code == 200: + return None + raise TahoeAPIError("put", uri, resp.code, content) diff --git a/src/_zkapauthorizer/tests/__init__.py b/src/_zkapauthorizer/tests/__init__.py index 12772a0529efb022a4c8d1b8a739c665686f9a4e..37705546e369f7b40cc2cf88876d37ddbc581454 100644 --- a/src/_zkapauthorizer/tests/__init__.py +++ b/src/_zkapauthorizer/tests/__init__.py @@ -67,3 +67,38 @@ def _configure_hypothesis(): _configure_hypothesis() + + +def _monkeypatch_tahoe_3874(): + # Fix https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3874 + from allmydata.testing.web import _FakeTahoeUriHandler + from hyperlink import DecodedURL + from twisted.web import http + + def render_GET(self, request): + uri = DecodedURL.from_text(request.uri.decode("utf8")) + capability = None + for arg, value in uri.query: + if arg == "uri": + capability = value.encode("ascii") + # it's legal to use the form "/uri/<capability>" + if capability is None and request.postpath and request.postpath[0]: + capability = request.postpath[0] + + # if we don't yet have a capability, that's an error + if capability is None: + request.setResponseCode(http.BAD_REQUEST) + return b"GET /uri requires uri=" + + # the user gave us a capability; if our Grid doesn't have any + # data for it, that's an error. + if capability not in self.data: + request.setResponseCode(http.BAD_REQUEST) + return "No data for '{}'".format(capability.decode("ascii")) + + return self.data[capability] + + _FakeTahoeUriHandler.render_GET = render_GET + + +_monkeypatch_tahoe_3874() diff --git a/src/_zkapauthorizer/tests/test_recover.py b/src/_zkapauthorizer/tests/test_recover.py index a92dad823e20645c7d0a16a100b5e5d5d948ae5a..49106883d110cd00a10b21309ba33474563f6724 100644 --- a/src/_zkapauthorizer/tests/test_recover.py +++ b/src/_zkapauthorizer/tests/test_recover.py @@ -5,6 +5,8 @@ Tests for ``_zkapauthorizer.recover``, the replication recovery system. from sqlite3 import Connection, connect from typing import Dict, Iterator +from allmydata.client import read_config +from fixtures import TempDir from hypothesis import assume, note, settings from hypothesis.stateful import ( RuleBasedStateMachine, @@ -14,6 +16,7 @@ from hypothesis.stateful import ( run_state_machine_as_test, ) from hypothesis.strategies import data, lists, randoms, sampled_from +from testresources import setUpResources, tearDownResources from testtools import TestCase from testtools.matchers import ( AfterPreprocessing, @@ -22,20 +25,25 @@ from testtools.matchers import ( IsInstance, MatchesStructure, ) -from testtools.twistedsupport import failed, succeeded -from twisted.internet.defer import Deferred +from testtools.twistedsupport import AsynchronousDeferredRunTest, failed, succeeded +from twisted.internet.defer import Deferred, inlineCallbacks +from twisted.python.filepath import FilePath from ..recover import ( AlreadyRecovering, RecoveryStages, StatefulRecoverer, + get_tahoe_lafs_downloader, make_canned_downloader, make_fail_downloader, noop_downloader, recover, ) +from ..tahoe import link, make_directory, upload +from .fixtures import Treq from .sql import Table, create_table from .strategies import deletes, inserts, sql_identifiers, tables, updates +from .test_tahoe import _client_manager def snapshot(connection: Connection) -> Iterator[str]: @@ -137,7 +145,7 @@ class StatefulRecoverTests(TestCase): # # Also try to play along with any profile that has been loaded. max_examples = settings.default.max_examples * 10 - stateful_step_count = int(max(1, settings.default.stateful_step_count / 10)) + stateful_step_count = int(max(3, settings.default.stateful_step_count / 10)) run_state_machine_as_test( lambda: SnapshotMachine(self), @@ -261,3 +269,60 @@ class StatefulRecovererTests(TestCase): ), ), ) + + +class TahoeLAFSDownloaderTests(TestCase): + """ + Tests for ``get_tahoe_lafs_downloader`` and ``tahoe_lafs_downloader``. + """ + + # Support test methods that return a Deferred. + run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=60.0) + + # Get a Tahoe-LAFS client node connected to a storage node. + resources = [("client", _client_manager)] + + def setUp(self): + super().setUp() + setUpResources(self, self.resources, None) + self.addCleanup(lambda: tearDownResources(self, self.resources, None)) + + @inlineCallbacks + def test_get_downloader(self): + """ + ``get_tahoe_lafs_downloader`` returns a downloader factory that can be + used to download objects using a Tahoe-LAFS client. + """ + snapshot_path = FilePath(self.useFixture(TempDir()).join("snapshot-source")) + snapshot_path.setContent(b"snapshot data") + + config = read_config(self.client.node_dir.path, "tub.port") + # AsynchronousDeferredRunTest sets reactor on us. + httpclient = self.useFixture(Treq(self.reactor, case=self)).client() + + replica_dir_cap_str = yield Deferred.fromCoroutine( + make_directory(httpclient, self.client.node_url), + ) + snapshot_cap_str = yield Deferred.fromCoroutine( + upload(httpclient, snapshot_path, self.client.node_url) + ) + yield Deferred.fromCoroutine( + link( + httpclient, + self.client.node_url, + replica_dir_cap_str, + "snapshot.sql", + snapshot_cap_str, + ) + ) + + get_downloader = get_tahoe_lafs_downloader(httpclient, config) + download = get_downloader(replica_dir_cap_str) + + downloaded_snapshot_path = yield Deferred.fromCoroutine( + download(lambda state: None) + ) + self.assertThat( + downloaded_snapshot_path.getContent(), + Equals(snapshot_path.getContent()), + ) diff --git a/src/_zkapauthorizer/tests/test_tahoe.py b/src/_zkapauthorizer/tests/test_tahoe.py index d88910e6043c3bad0e4106b476eb694fcc483b83..efedf3f300a24c142279e8b30f86e9e26831729e 100644 --- a/src/_zkapauthorizer/tests/test_tahoe.py +++ b/src/_zkapauthorizer/tests/test_tahoe.py @@ -9,18 +9,29 @@ from tempfile import mkdtemp from time import sleep from typing import Iterator, Optional +from allmydata.test.strategies import write_capabilities from attrs import define from fixtures import TempDir from hyperlink import DecodedURL +from hypothesis import given +from hypothesis.strategies import integers, lists, sampled_from, text, tuples from testresources import TestResourceManager, setUpResources, tearDownResources from testtools import TestCase -from testtools.matchers import Equals, Is, raises +from testtools.matchers import Equals, Is, Not, raises from testtools.twistedsupport import AsynchronousDeferredRunTest -from twisted.internet.defer import ensureDeferred, inlineCallbacks +from twisted.internet.defer import Deferred, ensureDeferred, inlineCallbacks from twisted.python.filepath import FilePath from yaml import safe_dump -from ..tahoe import async_retry, download, upload +from ..tahoe import ( + TahoeAPIError, + _scrub_cap, + async_retry, + download, + link, + make_directory, + upload, +) from .fixtures import Treq # A plausible value for the ``retry`` parameter of ``wait_for_path``. @@ -63,7 +74,7 @@ class TemporaryDirectoryResource(TestResourceManager): def make(self, dependency_resources): return FilePath(mkdtemp()) - def isDirty(self, resource): + def isDirty(self): # Can't detect when the directory is written to, so assume it # can never be reused. We could list the directory, but that might # not catch it being open as a cwd etc. @@ -137,8 +148,9 @@ class TahoeStorage: """ Start the node child process. """ + eliot = ["--eliot-destination", "file:" + self.node_dir.child("log.eliot").path] self.process = Popen( - TAHOE + ["run", self.node_dir.path], + TAHOE + eliot + ["run", self.node_dir.path], stdout=self.node_dir.child("stdout").open("wb"), stderr=self.node_dir.child("stderr").open("wb"), ) @@ -187,6 +199,7 @@ class TahoeStorageManager(TestResourceManager): Kill the storage node child process. """ storage.process.kill() + storage.process.wait() def make(self, dependency_resources): """ @@ -257,8 +270,9 @@ class TahoeClient: """ Start the node child process. """ + eliot = ["--eliot-destination", "file:" + self.node_dir.child("log.eliot").path] self.process = Popen( - TAHOE + ["run", self.node_dir.path], + TAHOE + eliot + ["run", self.node_dir.path], stdout=self.node_dir.child("stdout").open("wb"), stderr=self.node_dir.child("stderr").open("wb"), ) @@ -287,6 +301,7 @@ class TahoeClientManager(TestResourceManager): Kill the client node child process. """ client.process.kill() + client.process.wait() def make(self, dependency_resources): """ @@ -297,6 +312,49 @@ class TahoeClientManager(TestResourceManager): return client +class TahoeAPIErrorTests(TestCase): + """ + Tests for ``TahoeAPIError``. + """ + + @given(cap=write_capabilities().map(lambda uri: uri.to_string().decode("ascii"))) + def test_scrub_cap(self, cap): + """ + ``_scrub_cap`` returns a different string than it is called with. + """ + self.assertThat( + _scrub_cap(cap), + Not(Equals(cap)), + ) + + @given( + scheme=sampled_from(["http", "https"]), + host=sampled_from(["127.0.0.1", "localhost", "example.invalid"]), + port=integers(min_value=1, max_value=2 ** 16 - 1), + query=lists(tuples(text(), text())), + path_extra=lists(text()), + cap=write_capabilities().map(lambda uri: uri.to_string().decode("ascii")), + ) + def test_scrubbed_url(self, scheme, host, port, query, path_extra, cap): + """ + ``TahoeAPIError.url`` has capability strings scrubbed from it to avoid + accidentally leaking secrets in logs. + """ + original_path = ("uri", cap) + tuple(path_extra) + original = DecodedURL().replace( + scheme=scheme, host=host, port=port, path=original_path, query=query + ) + expected_path = ("uri", _scrub_cap(cap)) + tuple(path_extra) + expected = original.replace(path=expected_path) + + original_exc = TahoeAPIError("get", original, 200, "") + expected_exc = TahoeAPIError("get", expected, 200, "") + self.assertThat(original_exc, Equals(expected_exc)) + + +_client_manager = TahoeClientManager() + + class UploadDownloadTestCase(TestCase): """ Tests for ``upload`` and ``download``. @@ -306,7 +364,7 @@ class UploadDownloadTestCase(TestCase): run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=60.0) # Get a Tahoe-LAFS client node connected to a storage node. - resources = [("client", TahoeClientManager())] + resources = [("client", _client_manager)] def setUp(self): super().setUp() @@ -337,6 +395,84 @@ class UploadDownloadTestCase(TestCase): ) +class DirectoryTests(TestCase): + """ + Tests for directory-related functionality. + """ + + # Support test methods that return a Deferred. + run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=60.0) + + # Get a Tahoe-LAFS client node connected to a storage node. + resources = [("client", _client_manager)] + + def setUp(self): + super().setUp() + setUpResources(self, self.resources, None) + self.addCleanup(lambda: tearDownResources(self, self.resources, None)) + # AsynchronousDeferredRunTest sets reactor on us. + self.httpclient = self.useFixture(Treq(self.reactor, case=self)).client() + + @inlineCallbacks + def test_make_directory(self): + """ + ``make_directory`` returns a coroutine that completes with the capability + of a new, empty directory. + """ + dir_cap = yield Deferred.fromCoroutine( + make_directory(self.httpclient, self.client.node_url) + ) + + # If we can download it, consider that success. + outpath = FilePath(self.useFixture(TempDir()).join("dir_contents")) + yield Deferred.fromCoroutine( + download(self.httpclient, outpath, self.client.node_url, dir_cap) + ) + self.assertThat(outpath.getContent(), Not(Equals(b""))) + + @inlineCallbacks + def test_link(self): + """ + ``link`` adds an entry to a directory. + """ + tmp = FilePath(self.useFixture(TempDir()).path) + inpath = tmp.child("source") + inpath.setContent(b"some content") + + dir_cap = yield Deferred.fromCoroutine( + make_directory(self.httpclient, self.client.node_url) + ) + entry_name = "foo" + entry_cap = yield Deferred.fromCoroutine( + upload(self.httpclient, inpath, self.client.node_url), + ) + yield Deferred.fromCoroutine( + link( + self.httpclient, + self.client.node_url, + dir_cap, + entry_name, + entry_cap, + ), + ) + + outpath = tmp.child("destination") + yield Deferred.fromCoroutine( + download( + self.httpclient, + outpath, + self.client.node_url, + dir_cap, + child_path=[entry_name], + ), + ) + + self.assertThat( + outpath.getContent(), + Equals(inpath.getContent()), + ) + + class AsyncRetryTests(TestCase): """ Tests for ``async_retry``.