diff --git a/default.nix b/default.nix
index 47247993c937d7d3899c7667e9b22a3df8fe8f81..771b273f9da00b1eb1c18d4ec05e324d4c86bee9 100644
--- a/default.nix
+++ b/default.nix
@@ -64,7 +64,7 @@ in
         # the `.post999` looks weird enough that if someone really cares about
         # the version in use they will notice it and go searching for what's
         # going on and discover the real version specified by `src` below.
-        version = "1.17.0.post999";
+        version = "1.17.1.post999";
         # See https://github.com/DavHau/mach-nix/issues/190
         requirementsExtra =
           ''
diff --git a/nix/sources.json b/nix/sources.json
index 3107940ad6eb2e03ecd62262606c802987a8cdce..fee56665c6d1b33bd996147104cda9901abcf426 100644
--- a/nix/sources.json
+++ b/nix/sources.json
@@ -59,10 +59,10 @@
         "homepage": "https://tahoe-lafs.org/",
         "owner": "tahoe-lafs",
         "repo": "tahoe-lafs",
-        "rev": "tahoe-lafs-1.17.0",
-        "sha256": "0vjq7g1lfjd16y0iwnfsccp5k3q3av7wllkyqbsyd877a29nibzi",
+        "rev": "tahoe-lafs-1.17.1",
+        "sha256": "0vzl8xz4iwbq7d7saa6rimzgwj23s3vfgr3f428rbg0wp7dshs9s",
         "type": "tarball",
-        "url": "https://github.com/tahoe-lafs/tahoe-lafs/archive/tahoe-lafs-1.17.0.tar.gz",
+        "url": "https://github.com/tahoe-lafs/tahoe-lafs/archive/tahoe-lafs-1.17.1.tar.gz",
         "url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
     }
 }
diff --git a/setup.cfg b/setup.cfg
index 768163ca8a71a5f9f8efc5f7020ad57bff10b28b..99a57fb165ca22ac3f1f97e160667f09e890bbe4 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -45,7 +45,7 @@ install_requires =
     # Tahoe has no stable Python API but we use its Python API so there's
     # basically no wiggle room here.  We still use a (really tiny) range
     # because our Nix packaging provides a Tahoe-LAFS with a .postNNN version.
-    tahoe-lafs >=1.17.0,<1.17.1
+    tahoe-lafs >=1.17.1,<1.17.2
     treq
     pyutil
     prometheus-client
diff --git a/src/_zkapauthorizer/_storage_server.py b/src/_zkapauthorizer/_storage_server.py
index aa11097ae705904c287babd26905ab05e6b52bf1..0a4b76db61fcc0d89a66ff52a09faab8a7d978cc 100644
--- a/src/_zkapauthorizer/_storage_server.py
+++ b/src/_zkapauthorizer/_storage_server.py
@@ -27,12 +27,17 @@ from functools import partial
 from os import listdir, stat
 from os.path import join
 from struct import calcsize, unpack
-from typing import Dict, List, Optional
+from typing import Any, Dict, List, Optional, Tuple
 
 import attr
-from allmydata.interfaces import RIStorageServer, TestAndWriteVectorsForShares
+from allmydata.interfaces import TestAndWriteVectorsForShares
 from allmydata.storage.common import storage_index_to_dir
-from allmydata.storage.immutable import ShareFile
+from allmydata.storage.immutable import (
+    BucketWriter,
+    FoolscapBucketReader,
+    FoolscapBucketWriter,
+    ShareFile,
+)
 from allmydata.storage.lease import LeaseInfo
 from allmydata.storage.mutable import MutableShareFile
 from allmydata.storage.server import StorageServer
@@ -47,6 +52,7 @@ from challenge_bypass_ristretto import (
 )
 from eliot import log_call, start_action
 from foolscap.api import Referenceable
+from foolscap.ipb import IRemoteReference
 from prometheus_client import CollectorRegistry, Histogram
 from twisted.internet.defer import Deferred
 from twisted.internet.interfaces import IReactorTime
@@ -189,7 +195,10 @@ class ZKAPAuthorizerStorageServer(Referenceable):
     # control it ourselves.
     LEASE_PERIOD = timedelta(days=31)
 
-    _original = attr.ib(validator=provides(RIStorageServer))
+    # A StorageServer instance, but not validated because of the fake used in
+    # the test suite.
+    _original = attr.ib()
+
     _pass_value = pass_value_attribute()
     _signing_key = attr.ib(validator=instance_of(SigningKey))
     _spender = attr.ib(validator=provides(ISpender))
@@ -203,6 +212,12 @@ class ZKAPAuthorizerStorageServer(Referenceable):
     )
     _public_key = attr.ib(init=False)
     _metric_spending_successes = attr.ib(init=False)
+    _bucket_writer_disconnect_markers: Dict[
+        BucketWriter, Tuple[IRemoteReference, Any]
+    ] = attr.ib(
+        init=False,
+        default=attr.Factory(dict),
+    )
 
     @_public_key.default
     def _get_public_key(self):
@@ -211,6 +226,27 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         # so that `self._signing_key` will be assigned when this runs.
         return PublicKey.from_signing_key(self._signing_key)
 
+    def _bucket_writer_closed(self, bw: BucketWriter):
+        """
+        This is registered as a callback with the storage backend and receives
+        notification when a bucket writer is closed.  It removes the
+        disconnection-based cleanup callback for the given bucket.
+        """
+        # This implementation was originally copied from
+        # allmydata.storage.server.FoolscapStorageServer.  Since we don't use
+        # Tahoe's Foolscap storage server layer we need to do this bucket
+        # writer bookkeeping ourselves.
+        if bw in self._bucket_writer_disconnect_markers:
+            canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw)
+            canary.dontNotifyOnDisconnect(disconnect_marker)
+
+    def __attrs_post_init__(self):
+        """
+        Finish initialization after attrs does its job.  This consists of
+        registering a cleanup handler with the storage backend.
+        """
+        self._original.register_bucket_writer_close_handler(self._bucket_writer_closed)
+
     def _get_spending_histogram_buckets(self):
         """
         Create the upper bounds for the ZKAP spending histogram.
@@ -253,7 +289,7 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         Pass-through without pass check to allow clients to learn about our
         version and configuration in case it helps them decide how to behave.
         """
-        return self._original.remote_get_version()
+        return self._original.get_version()
 
     def remote_allocate_buckets(
         self,
@@ -302,7 +338,7 @@ class ZKAPAuthorizerStorageServer(Referenceable):
             allocated_size,
         )
 
-        alreadygot, bucketwriters = self._original._allocate_buckets(
+        alreadygot, bucketwriters = self._original.allocate_buckets(
             storage_index,
             renew_secret,
             cancel_secret,
@@ -330,7 +366,7 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         # StorageServer.remote_allocate_buckets.
         for bw in bucketwriters.values():
             disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
-            self._original._bucket_writer_disconnect_markers[bw] = (
+            self._bucket_writer_disconnect_markers[bw] = (
                 canary,
                 disconnect_marker,
             )
@@ -338,14 +374,19 @@ class ZKAPAuthorizerStorageServer(Referenceable):
             self._public_key,
             validation.valid[:spent_passes],
         )
-        return alreadygot, bucketwriters
+        return alreadygot, {
+            k: FoolscapBucketWriter(bw) for (k, bw) in bucketwriters.items()
+        }
 
     def remote_get_buckets(self, storage_index):
         """
         Pass-through without pass check to let clients read immutable shares as
         long as those shares exist.
         """
-        return self._original.remote_get_buckets(storage_index)
+        return {
+            k: FoolscapBucketReader(bucket)
+            for (k, bucket) in self._original.get_buckets(storage_index).items()
+        }
 
     def remote_add_lease(self, passes, storage_index, *a, **kw):
         """
@@ -363,7 +404,7 @@ class ZKAPAuthorizerStorageServer(Referenceable):
             validation,
             self._original,
         )
-        result = self._original.remote_add_lease(storage_index, *a, **kw)
+        result = self._original.add_lease(storage_index, *a, **kw)
         self._spender.mark_as_spent(
             self._public_key,
             validation.valid,
@@ -376,7 +417,7 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         Pass-through without a pass check to let clients inform us of possible
         issues with the system without incurring any cost to themselves.
         """
-        return self._original.remote_advise_corrupt_share(*a, **kw)
+        return self._original.advise_corrupt_share(*a, **kw)
 
     def remote_share_sizes(self, storage_index_or_slot, sharenums):
         with start_action(
@@ -521,7 +562,7 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         Pass-through without a pass check to let clients read mutable shares as
         long as those shares exist.
         """
-        return self._original.remote_slot_readv(*a, **kw)
+        return self._original.slot_readv(*a, **kw)
 
 
 def check_pass_quantity(pass_value, validation, share_sizes):
diff --git a/src/_zkapauthorizer/tests/fixtures.py b/src/_zkapauthorizer/tests/fixtures.py
index 54d27b96c42014149e21d9cdcb83a6376d4df7f4..8803180f62410c0f62e7032563ef6af7ae142a3c 100644
--- a/src/_zkapauthorizer/tests/fixtures.py
+++ b/src/_zkapauthorizer/tests/fixtures.py
@@ -28,23 +28,24 @@ from ..controller import DummyRedeemer, PaymentController
 from ..model import VoucherStore, memory_connect, open_and_initialize
 
 
-@attr.s
+@attr.s(auto_attribs=True)
 class AnonymousStorageServer(Fixture):
     """
     Supply an instance of allmydata.storage.server.StorageServer which
     implements anonymous access to Tahoe-LAFS storage server functionality.
 
-    :ivar FilePath tempdir: The path to the server's storage on the
-        filesystem.
+    :ivar tempdir: The path to the server's storage on the filesystem.
 
-    :ivar allmydata.storage.server.StorageServer storage_server: The storage
-        server.
+    :ivar storage_server: The protocol-agnostic storage server backend.
 
-    :ivar twisted.internet.task.Clock clock: The ``IReactorTime`` provider to
-        supply to ``StorageServer`` for its time-checking needs.
+    :ivar clock: The ``IReactorTime`` provider to supply to ``StorageServer``
+        for its time-checking needs.
     """
 
-    clock = attr.ib()
+    clock: Clock = attr.ib()
+
+    tempdir: FilePath = attr.ib(default=None)
+    storage_server: StorageServer = attr.ib(default=None)
 
     def _setUp(self):
         self.tempdir = FilePath(self.useFixture(TempDir()).join(u"storage"))
diff --git a/src/_zkapauthorizer/tests/foolscap.py b/src/_zkapauthorizer/tests/foolscap.py
index 8ad3345c45c44634052bb680032c4f8c0c18ede1..b10b00ed48a1ccdc75d3c6037ebeaf0dd8fc66c1 100644
--- a/src/_zkapauthorizer/tests/foolscap.py
+++ b/src/_zkapauthorizer/tests/foolscap.py
@@ -17,7 +17,6 @@ Testing helpers related to Foolscap.
 """
 
 import attr
-from allmydata.interfaces import RIStorageServer
 from foolscap.api import Any, Copyable, Referenceable, RemoteInterface
 from foolscap.copyable import CopyableSlicer, ICopyable
 from twisted.internet.defer import fail, succeed
@@ -33,9 +32,9 @@ class RIEcho(RemoteInterface):
         return Any()
 
 
-@implementer(RIStorageServer)
 class StubStorageServer(object):
-    pass
+    def register_bucket_writer_close_handler(self, handler):
+        pass
 
 
 def get_anonymous_storage_server():
diff --git a/src/_zkapauthorizer/tests/storage_common.py b/src/_zkapauthorizer/tests/storage_common.py
index a168395ca073ba197ece412a39dbcbf346c2a3f6..39d85d25c6132ab0b8a14cddc267787b1e160619 100644
--- a/src/_zkapauthorizer/tests/storage_common.py
+++ b/src/_zkapauthorizer/tests/storage_common.py
@@ -36,13 +36,17 @@ from .strategies import bytes_for_share  # Not really a strategy...
 LEASE_INTERVAL = 60 * 60 * 24 * 31
 
 
-def cleanup_storage_server(storage_server):
+def reset_storage_server(storage_server):
     """
-    Delete all of the shares held by the given storage server.
+    Restore a storage server to a default state.  This includes
+    deleting all of the shares it holds.
 
     :param allmydata.storage.server.StorageServer storage_server: The storage
         server with some on-disk shares to delete.
     """
+    # A storage server is read-write by default.
+    storage_server.readonly_storage = False
+
     starts = [
         FilePath(storage_server.sharedir),
         FilePath(storage_server.corruption_advisory_dir),
@@ -60,7 +64,6 @@ def write_toy_shares(
     cancel_secret,
     sharenums,
     size,
-    canary,
 ):
     """
     Write some immutable shares to the given storage server.
@@ -71,19 +74,17 @@ def write_toy_shares(
     :param bytes cancel_secret:
     :param set[int] sharenums:
     :param int size:
-    :param IRemoteReference canary:
     """
-    _, allocated = storage_server.remote_allocate_buckets(
+    _, allocated = storage_server.allocate_buckets(
         storage_index,
         renew_secret,
         cancel_secret,
         sharenums,
         size,
-        canary=canary,
     )
     for (sharenum, writer) in allocated.items():
-        writer.remote_write(0, bytes_for_share(sharenum, size))
-        writer.remote_close()
+        writer.write(0, bytes_for_share(sharenum, size))
+        writer.close()
 
 
 def whitebox_write_sparse_share(sharepath, version, size, leases, now):
diff --git a/src/_zkapauthorizer/tests/test_storage_protocol.py b/src/_zkapauthorizer/tests/test_storage_protocol.py
index ceb34deb3d1143a55f8072de31f658992ccce226..10e30c0fa51da66f27f975a44bc16dd6689cd65f 100644
--- a/src/_zkapauthorizer/tests/test_storage_protocol.py
+++ b/src/_zkapauthorizer/tests/test_storage_protocol.py
@@ -61,10 +61,10 @@ from .foolscap import LocalRemote
 from .matchers import matches_spent_passes, matches_version_dictionary
 from .storage_common import (
     LEASE_INTERVAL,
-    cleanup_storage_server,
     get_passes,
     pass_factory,
     privacypass_passes,
+    reset_storage_server,
     whitebox_write_sparse_share,
     write_toy_shares,
 )
@@ -191,7 +191,7 @@ class ShareTests(TestCase):
         self.spending_recorder.reset()
 
         # And clean out any shares that might confuse things.
-        cleanup_storage_server(self.anonymous_storage_server)
+        reset_storage_server(self.anonymous_storage_server)
 
     def test_get_version(self):
         """
@@ -416,7 +416,6 @@ class ShareTests(TestCase):
             cancel_secret,
             existing_sharenums,
             size,
-            canary=self.canary,
         )
 
         # Let some time pass so leases added after this point will look
@@ -510,7 +509,6 @@ class ShareTests(TestCase):
             cancel_secret,
             sharenums,
             size,
-            canary=self.canary,
         )
 
         self.assertThat(
@@ -550,7 +548,7 @@ class ShareTests(TestCase):
         # Perhaps put some more leases on it.  Leases might impact our
         # ability to determine share data size.
         for renew_secret in leases:
-            self.anonymous_storage_server.remote_add_lease(
+            self.anonymous_storage_server.add_lease(
                 storage_index,
                 renew_secret,
                 cancel_secret,
@@ -598,7 +596,6 @@ class ShareTests(TestCase):
                 cancel_secret,
                 sharenums,
                 size,
-                canary,
             ),
         )
 
@@ -825,7 +822,6 @@ class ShareTests(TestCase):
             cancel_secret,
             {sharenum},
             size,
-            canary=self.canary,
         )
 
         self.assertThat(
diff --git a/src/_zkapauthorizer/tests/test_storage_server.py b/src/_zkapauthorizer/tests/test_storage_server.py
index 73a0001fc38d5924d133d09682effa3c20727eb0..a0237bb7b82cf5c6069d7ae6f243bb37479f22d1 100644
--- a/src/_zkapauthorizer/tests/test_storage_server.py
+++ b/src/_zkapauthorizer/tests/test_storage_server.py
@@ -19,6 +19,7 @@ Tests for ``_zkapauthorizer._storage_server``.
 from random import shuffle
 from time import time
 
+from allmydata.interfaces import NoSpace
 from allmydata.storage.mutable import MutableShareFile
 from challenge_bypass_ristretto import PublicKey, random_signing_key
 from foolscap.referenceable import LocalReferenceable
@@ -44,7 +45,7 @@ from ..storage_common import (
 from .common import skipIf
 from .fixtures import AnonymousStorageServer
 from .matchers import matches_spent_passes, raises
-from .storage_common import cleanup_storage_server, get_passes, write_toy_shares
+from .storage_common import get_passes, reset_storage_server, write_toy_shares
 from .strategies import (
     lease_cancel_secrets,
     lease_renew_secrets,
@@ -221,7 +222,8 @@ class PassValidationTests(TestCase):
         # Hypothesis and testtools fixtures don't play nicely together in a
         # way that allows us to just move everything from `setUp` into this
         # method.
-        cleanup_storage_server(self.anonymous_storage_server)
+        reset_storage_server(self.anonymous_storage_server)
+
         self.spending_recorder.reset()
 
         # Reset all of the metrics, too, so the individual tests have a
@@ -533,7 +535,6 @@ class PassValidationTests(TestCase):
             cancel_secret,
             sharenums,
             allocated_size,
-            LocalReferenceable(None),
         )
 
         # Advance time to a point where the lease is expired.  This simplifies
@@ -807,7 +808,6 @@ class PassValidationTests(TestCase):
             cancel_secret,
             existing_sharenums,
             size,
-            LocalReferenceable(None),
         )
 
         # The client will present this many passes.
@@ -878,7 +878,6 @@ class PassValidationTests(TestCase):
             cancel_secret,
             sharenums,
             allocated_size,
-            LocalReferenceable(None),
         )
 
         num_passes = required_passes(
@@ -919,18 +918,21 @@ class PassValidationTests(TestCase):
 
     @given(
         storage_index=storage_indexes(),
-        renew_secret=lease_renew_secrets(),
+        renew_secrets=lists(lease_renew_secrets(), min_size=2, max_size=2, unique=True),
         cancel_secret=lease_cancel_secrets(),
         sharenums=sharenum_sets(),
         allocated_size=sizes(),
     )
     def test_add_lease_metrics_on_failure(
-        self, storage_index, renew_secret, cancel_secret, sharenums, allocated_size
+        self, storage_index, renew_secrets, cancel_secret, sharenums, allocated_size
     ):
         """
         If the ``add_lease`` operation fails then the successful pass spending
         metric is not incremented.
         """
+        # We have two renew secrets so we can operate on two distinct leases.
+        renew_secret, another_renew_secret = renew_secrets
+
         # Put some shares up there to target with the add_lease operation.
         write_toy_shares(
             self.anonymous_storage_server,
@@ -939,7 +941,6 @@ class PassValidationTests(TestCase):
             cancel_secret,
             sharenums,
             allocated_size,
-            LocalReferenceable(None),
         )
 
         num_passes = required_passes(
@@ -951,12 +952,9 @@ class PassValidationTests(TestCase):
             self.signing_key,
         )
 
-        # Tahoe doesn't make it very easy to make an add_lease operation fail
-        # so monkey-patch something broken in.  After 1.17.0 we can set
-        # `reserved_space` on StorageServer to a very large number and the
-        # server should refuse to allocate space for a *new* lease (which
-        # means we need to use a different renew secret for the next step.
-        self.anonymous_storage_server.remote_add_lease = lambda *a, **kw: 1 / 0
+        # Turn off space-allocating operations entirely.  Since there will be
+        # no space for a new lease, the operation will fail.
+        self.anonymous_storage_server.readonly_storage = True
 
         try:
             self.storage_server.doRemoteCall(
@@ -965,14 +963,14 @@ class PassValidationTests(TestCase):
                 dict(
                     passes=_encode_passes(valid_passes),
                     storage_index=storage_index,
-                    renew_secret=renew_secret,
+                    renew_secret=another_renew_secret,
                     cancel_secret=cancel_secret,
                 ),
             )
-        except ZeroDivisionError:
+        except NoSpace:
             pass
         else:
-            self.fail("expected our ZeroDivisionError to be raised")
+            self.fail("expected NoSpace to be raised")
 
         after_count = read_spending_success_histogram_total(self.storage_server)
         self.expectThat(