diff --git a/src/_zkapauthorizer/_storage_client.py b/src/_zkapauthorizer/_storage_client.py
index eafc1196f95d5a826cc846f079c452b9c91bfd43..8269a6442a90fb6d32f468e8fd630c057799eec5 100644
--- a/src/_zkapauthorizer/_storage_client.py
+++ b/src/_zkapauthorizer/_storage_client.py
@@ -25,7 +25,10 @@ import attr
 from zope.interface import (
     implementer,
 )
-
+from twisted.internet.defer import (
+    inlineCallbacks,
+    returnValue,
+)
 from allmydata.interfaces import (
     IStorageServer,
 )
@@ -38,6 +41,7 @@ from .storage_common import (
     renew_lease_message,
     slot_testv_and_readv_and_writev_message,
     has_writes,
+    get_implied_data_length,
 )
 
 @implementer(IStorageServer)
@@ -164,6 +168,7 @@ class ZKAPAuthorizerStorageClient(object):
             reason,
         )
 
+    @inlineCallbacks
     def slot_testv_and_readv_and_writev(
             self,
             storage_index,
@@ -172,17 +177,64 @@ class ZKAPAuthorizerStorageClient(object):
             r_vector,
     ):
         if has_writes(tw_vectors):
-            passes = self._get_encoded_passes(slot_testv_and_readv_and_writev_message(storage_index), 1)
+            # When performing writes, if we're increasing the storage
+            # requirement, we need to spend more passes.  Unfortunately we
+            # don't know what the current storage requirements are at this
+            # layer of the system.  It's *likely* that a higher layer does but
+            # that doesn't help us, even if it were guaranteed.  So, instead,
+            # ask the server.  Invoke a ZKAPAuthorizer-supplied remote method
+            # on the storage server that will give us a really good estimate
+            # of the current size of all of the specified shares (keys of
+            # tw_vectors).
+            current_size = yield self._rref.callRemote(
+                "slot_share_sizes",
+                storage_index,
+                set(tw_vectors),
+            )
+            if current_size is None:
+                # The server says it doesn't even know about these shares for
+                # this storage index.  Thus, we have not yet paid anything for
+                # it and we're about to create it.
+                current_pass_count = 0
+            else:
+                # Compute how much has already been paid for the storage
+                # that's already allocated.  We're not required to pay this
+                # again.
+                current_pass_count = required_passes(BYTES_PER_PASS, {0}, current_size)
+
+            # Determine what the share size which will result from the write
+            # we're about to perform.
+            implied_sizes = (
+                get_implied_data_length(data_vector, length)
+                for (_, data_vector, length)
+                in tw_vectors.values()
+            )
+            # Total that across all of the shares and figure how many passes
+            # it it would cost if we had to pay for all of it.
+            new_size = sum(implied_sizes, 0)
+            new_pass_count = required_passes(BYTES_PER_PASS, {0}, new_size)
+            # Now compute how much hasn't yet been paid.
+            pass_count_increase = new_pass_count - current_pass_count
+            # And prepare to pay it.
+            passes = self._get_encoded_passes(
+                slot_testv_and_readv_and_writev_message(storage_index),
+                pass_count_increase,
+            )
         else:
+            # Non-write operations on slots are free.
             passes = []
-        return self._rref.callRemote(
-            "slot_testv_and_readv_and_writev",
-            passes,
-            storage_index,
-            secrets,
-            tw_vectors,
-            r_vector,
-        )
+
+        # Perform the operation with the passes we determined are required.
+        returnValue((
+            yield self._rref.callRemote(
+                "slot_testv_and_readv_and_writev",
+                passes,
+                storage_index,
+                secrets,
+                tw_vectors,
+                r_vector,
+            )
+        ))
 
     def slot_readv(
             self,
diff --git a/src/_zkapauthorizer/_storage_server.py b/src/_zkapauthorizer/_storage_server.py
index b95eb3b3633c5336211b7f4fb9e9f8e865128849..1035598dbb747f0dcfb5b779c00c193ef7d4644c 100644
--- a/src/_zkapauthorizer/_storage_server.py
+++ b/src/_zkapauthorizer/_storage_server.py
@@ -24,10 +24,22 @@ from __future__ import (
     absolute_import,
 )
 
+from errno import (
+    ENOENT,
+)
+
 from functools import (
     partial,
 )
 
+from os.path import (
+    join,
+)
+from os import (
+    listdir,
+    stat,
+)
+
 import attr
 from attr.validators import (
     provides,
@@ -47,6 +59,9 @@ from foolscap.ipb import (
 from allmydata.interfaces import (
     RIStorageServer,
 )
+from allmydata.storage.common import (
+    storage_index_to_dir,
+)
 from privacypass import (
     TokenPreimage,
     VerificationSignature,
@@ -71,8 +86,13 @@ from .storage_common import (
     renew_lease_message,
     slot_testv_and_readv_and_writev_message,
     has_writes,
+    get_implied_data_length,
 )
 
+# See allmydata/storage/mutable.py
+SLOT_HEADER_SIZE = 468
+LEASE_TRAILER_SIZE = 4
+
 class MorePassesRequired(Exception):
     """
     Storage operations fail with ``MorePassesRequired`` when they are not
@@ -220,6 +240,14 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         """
         return self._original.remote_advise_corrupt_share(*a, **kw)
 
+    def remote_slot_share_sizes(self, storage_index, sharenums):
+        try:
+            return get_slot_share_size(self._original, storage_index, sharenums)
+        except OSError as e:
+            if e.errno == ENOENT:
+                return None
+            raise
+
     def remote_slot_testv_and_readv_and_writev(
             self,
             passes,
@@ -236,23 +264,45 @@ class ZKAPAuthorizerStorageServer(Referenceable):
             data in already-allocated storage.  These cases may not be the
             same from the perspective of pass validation.
         """
+        # print("passes = {}".format(len(passes)))
+        # print("tw_vectors = {}".format(tw_vectors))
         renew_leases = False
 
         if has_writes(tw_vectors):
-            # Writes are allowed to shares with active leases.
-            if not has_active_lease(
-                self._original,
-                storage_index,
-                self._clock.seconds(),
-            ):
-                # Passes may be supplied with the write to create the
-                # necessary lease as part of the same operation.  This must be
-                # supported because there is no separate protocol action to
-                # *create* a slot.  Clients just begin writing to it.
-                valid_passes = self._validate_passes(
-                    slot_testv_and_readv_and_writev_message(storage_index),
-                    passes,
+            # Passes may be supplied with the write to create the
+            # necessary lease as part of the same operation.  This must be
+            # supported because there is no separate protocol action to
+            # *create* a slot.  Clients just begin writing to it.
+            valid_passes = self._validate_passes(
+                slot_testv_and_readv_and_writev_message(storage_index),
+                passes,
+            )
+            if has_active_lease(self._original, storage_index, self._clock.seconds()):
+                current_length = get_slot_share_size(self._original, storage_index, tw_vectors.keys())
+                # Perform a sum() here because we're going to lie to
+                # required_passes and tell it the allocated size is for a
+                # single share.  The tw_vectors API lets different shares be
+                # different sizes, though I don't think the Tahoe-LAFS client
+                # intentionally causes this to happen.  Letting such a case
+                # pass by the pass calculation would possibly offer free
+                # storage to altered clients.
+                implied_sizes = (
+                    get_implied_data_length(data_vector, new_length)
+                    for (_, data_vector, new_length)
+                    in tw_vectors.values()
                 )
+                new_length = sum(implied_sizes, 0)
+                current_passes = required_passes(BYTES_PER_PASS, {0}, current_length)
+                new_passes = required_passes(BYTES_PER_PASS, {0}, new_length)
+                required_new_passes = new_passes - current_passes
+                # print("Current length: {}".format(current_length))
+                # print("New length: {}".format(new_length))
+                # print("Current passes: {}".format(current_passes))
+                # print("New passes: {}".format(new_passes))
+                # print("Required new passes: {}".format(required_new_passes))
+                if required_new_passes > len(valid_passes):
+                    raise MorePassesRequired(len(valid_passes), required_new_passes)
+            else:
                 check_pass_quantity_for_mutable_write(len(valid_passes), tw_vectors)
                 renew_leases = True
 
@@ -277,38 +327,6 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         return self._original.remote_slot_readv(*a, **kw)
 
 
-def get_sharenums(tw_vectors):
-    """
-    :param tw_vectors: See
-        ``allmydata.interfaces.TestAndWriteVectorsForShares``.
-
-    :return set[int]: The share numbers which the given test/write vectors would write to.
-    """
-    return set(
-        sharenum
-        for (sharenum, (test, data, new_length))
-        in tw_vectors.items()
-        if data
-    )
-
-
-def get_allocated_size(tw_vectors):
-    """
-    :param tw_vectors: See
-        ``allmydata.interfaces.TestAndWriteVectorsForShares``.
-
-    :return int: The largest position ``tw_vectors`` writes in any share.
-    """
-    return max(
-        list(
-            max(offset + len(s) for (offset, s) in data)
-            for (sharenum, (test, data, new_length))
-            in tw_vectors.items()
-            if data
-        ),
-    )
-
-
 def has_active_lease(storage_server, storage_index, now):
     """
     :param allmydata.storage.server.StorageServer storage_server: A storage
@@ -345,6 +363,10 @@ def check_pass_quantity_for_write(valid_count, sharenums, allocated_size):
     :return: ``None`` if the number of valid passes given is sufficient.
     """
     required_pass_count = required_passes(BYTES_PER_PASS, sharenums, allocated_size)
+    # print("valid_count = {}".format(valid_count))
+    # print("sharenums = {}".format(len(sharenums)))
+    # print("allocated size = {}".format(allocated_size))
+    # print("required_pass_count = {}".format(required_pass_count))
     if valid_count < required_pass_count:
         raise MorePassesRequired(
             valid_count,
@@ -362,9 +384,60 @@ def check_pass_quantity_for_mutable_write(valid_count, tw_vectors):
     :param tw_vectors: See
         ``allmydata.interfaces.TestAndWriteVectorsForShares``.
     """
-    sharenums = get_sharenums(tw_vectors)
-    allocated_size = get_allocated_size(tw_vectors)
-    check_pass_quantity_for_write(valid_count, sharenums, allocated_size)
+    implied_sizes = (
+        get_implied_data_length(data_vector, new_length)
+        for (_, data_vector, new_length)
+        in tw_vectors.values()
+    )
+    total_implied_size = sum(implied_sizes, 0)
+    check_pass_quantity_for_write(valid_count, {0}, total_implied_size)
+
+
+def get_slot_share_size(storage_server, storage_index, sharenums):
+    """
+    Total the on-disk storage committed to the given shares in the given
+    storage index.
+
+    :param allmydata.storage.server.StorageServer storage_server: The storage
+        server which owns the on-disk storage.
+
+    :param bytes storage_index: The storage index to inspect.
+
+    :param list[int] sharenums: The share numbers to consider.
+
+    :return int: The number of bytes the given shares use on disk.  Note this
+        is naive with respect to filesystem features like compression or
+        sparse files.  It is just a total of the size reported by the
+        filesystem.
+    """
+    total = 0
+    bucket = join(storage_server.sharedir, storage_index_to_dir(storage_index))
+    for candidate in listdir(bucket):
+        try:
+            sharenum = int(candidate)
+        except ValueError:
+            pass
+        else:
+            if sharenum in sharenums:
+                try:
+                    metadata = stat(join(bucket, candidate))
+                except Exception as e:
+                    print(e)
+                else:
+                    # Compared to calculating how much *user* data we're
+                    # storing, the on-disk file is larger by at *least*
+                    # SLOT_HEADER_SIZE.  There is also a variable sized
+                    # trailer which is harder to compute but which is at least
+                    # LEASE_TRAILER_SIZE.  Fortunately it's often exactly
+                    # LEASE_TRAILER_SIZE so I'm just going to ignore it for
+                    # now.
+                    #
+                    # By measuring that the slots are larger than the data the
+                    # user is storing we'll overestimate how many passes are
+                    # required right around the boundary between two costs.
+                    # Oops.
+                    total += (metadata.st_size - SLOT_HEADER_SIZE - LEASE_TRAILER_SIZE)
+    return total
 
 
 # I don't understand why this is required.
diff --git a/src/_zkapauthorizer/foolscap.py b/src/_zkapauthorizer/foolscap.py
index 948aff38eb681754af6e24e57d5252ce34254196..c9b865a66d01cb28d17c757f1971258aa34b7dc6 100644
--- a/src/_zkapauthorizer/foolscap.py
+++ b/src/_zkapauthorizer/foolscap.py
@@ -6,7 +6,9 @@ from foolscap.constraint import (
     ByteStringConstraint,
 )
 from foolscap.api import (
+    SetOf,
     ListOf,
+    ChoiceOf,
 )
 from foolscap.remoteinterface import (
     RemoteMethodSchema,
@@ -14,7 +16,10 @@ from foolscap.remoteinterface import (
 )
 
 from allmydata.interfaces import (
+    MAX_BUCKETS,
+    StorageIndex,
     RIStorageServer,
+    Offset,
 )
 
 # The Foolscap convention seems to be to try to constrain inputs to valid
@@ -23,11 +28,18 @@ from allmydata.interfaces import (
 # well.  Though it may still make sense on a non-Foolscap protocol (eg HTTP)
 # which Tahoe-LAFS may eventually support.
 #
-# In any case, for now, pick some fairly arbitrary value.  I am deliberately
-# picking a small number here and expect to have to raise.  However, ideally,
-# a client could accomplish a lot with a few passes while also not wasting a
-# lot of value.
-_MAXIMUM_PASSES_PER_CALL = 10
+# If a pass is worth 128 KiB of storage for some amount of time, 2 ** 20
+# passes is worth 128 GiB of storage for some amount of time.  It is an
+# arbitrary upper limit on the size of immutable files but maybe it's large
+# enough to not be an issue for a while.
+#
+# The argument for having a limit here at all is protection against denial of
+# service attacks that exhaust server memory but creating unbearably large
+# lists.
+#
+# A limit of 2 ** 20 passes translates to 177 MiB (times some constant factor
+# for Foolscap/Python overhead).  That should be tolerable.
+_MAXIMUM_PASSES_PER_CALL = 2 ** 20
 
 # This is the length of a serialized Ristretto-flavored PrivacyPass pass The
 # pass is a combination of token preimages and unblinded token signatures,
@@ -110,6 +122,19 @@ class RIPrivacyPassAuthorizedStorageServer(RemoteInterface):
 
     get_buckets = RIStorageServer["get_buckets"]
 
+    def slot_share_sizes(
+            storage_index=StorageIndex,
+            sharenums=SetOf(int, maxLength=MAX_BUCKETS),
+    ):
+        """
+        Get the size of the given shares in the given storage index.  If there are
+        no shares, ``None``.
+
+        The reported size may be larger than the actual share size if there
+        are more than four leases on the share.
+        """
+        return ChoiceOf(None, Offset)
+
     slot_readv = RIStorageServer["slot_readv"]
 
     slot_testv_and_readv_and_writev = add_passes(
diff --git a/src/_zkapauthorizer/storage_common.py b/src/_zkapauthorizer/storage_common.py
index 5baacb22be69bb3e3473c3e7b8b22a7994320845..21b7e41d2c65c9287e5b75f241b809c8dba2d4be 100644
--- a/src/_zkapauthorizer/storage_common.py
+++ b/src/_zkapauthorizer/storage_common.py
@@ -75,7 +75,61 @@ def has_writes(tw_vectors):
     :return bool: ``True`` if any only if there are writes in ``tw_vectors``.
     """
     return any(
-        data
+        data or (new_length is not None)
         for (test, data, new_length)
         in tw_vectors.values()
     )
+
+
+def get_sharenums(tw_vectors):
+    """
+    :param tw_vectors: See
+        ``allmydata.interfaces.TestAndWriteVectorsForShares``.
+
+    :return set[int]: The share numbers which the given test/write vectors would write to.
+    """
+    return set(
+        sharenum
+        for (sharenum, (test, data, new_length))
+        in tw_vectors.items()
+        if data
+    )
+
+
+def get_allocated_size(tw_vectors):
+    """
+    :param tw_vectors: See
+        ``allmydata.interfaces.TestAndWriteVectorsForShares``.
+
+    :return int: The largest position ``tw_vectors`` writes in any share.
+    """
+    return max(
+        list(
+            max(offset + len(s) for (offset, s) in data)
+            for (sharenum, (test, data, new_length))
+            in tw_vectors.items()
+            if data
+        ),
+    )
+
+
+def get_implied_data_length(data_vector, length):
+    """
+    :param data_vector: See ``allmydata.interfaces.DataVector``.
+
+    :param length: ``None`` or an overriding value for the length of the data.
+        This corresponds to the *new length* in
+        ``allmydata.interfaces.TestAndWriteVectorsForShares``.  It may be
+        smaller than the result would be considering only ``data_vector`` if
+        there is a trunctation or larger if there is a zero-filled extension.
+
+    :return int: The amount of data, in bytes, implied by a data vector and a
+        size.
+    """
+    if length is None:
+        return max(
+            offset + len(data)
+            for (offset, data)
+            in data_vector
+        )
+    return length
diff --git a/src/_zkapauthorizer/tests/__init__.py b/src/_zkapauthorizer/tests/__init__.py
index 5019fa0eacdc71e14869e1c2dca6735e6626a433..1fd9a9da7f721537646cd78176a3c063909b2101 100644
--- a/src/_zkapauthorizer/tests/__init__.py
+++ b/src/_zkapauthorizer/tests/__init__.py
@@ -42,7 +42,14 @@ def _configure_hypothesis():
         deadline=None,
     )
 
+    settings.register_profile(
+        "big",
+        max_examples=10000,
+        deadline=None,
+    )
+
     profile_name = environ.get("ZKAPAUTHORIZER_HYPOTHESIS_PROFILE", "default")
     settings.load_profile(profile_name)
+    print("Loaded profile {}".format(profile_name))
 
 _configure_hypothesis()
diff --git a/src/_zkapauthorizer/tests/storage_common.py b/src/_zkapauthorizer/tests/storage_common.py
new file mode 100644
index 0000000000000000000000000000000000000000..6ae3a4c2c67fa2077e537ec3f33d06c96a94b4d1
--- /dev/null
+++ b/src/_zkapauthorizer/tests/storage_common.py
@@ -0,0 +1,37 @@
+# Copyright 2019 PrivateStorage.io, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+``allmydata.storage``-related helpers shared across the test suite.
+"""
+
+from twisted.python.filepath import (
+    FilePath,
+)
+
+def cleanup_storage_server(storage_server):
+    """
+    Delete all of the shares held by the given storage server.
+
+    :param allmydata.storage.server.StorageServer storage_server: The storage
+        server with some on-disk shares to delete.
+    """
+    starts = [
+        FilePath(storage_server.sharedir),
+        FilePath(storage_server.corruption_advisory_dir),
+    ]
+    for start in starts:
+        for p in start.walk():
+            if p is not start:
+                p.remove()
diff --git a/src/_zkapauthorizer/tests/strategies.py b/src/_zkapauthorizer/tests/strategies.py
index 037d5a331a5ee6c74bd9f0cb41bdbbb54bb007ff..b9b781271f8d78d800e5a75537734baa7d858fa9 100644
--- a/src/_zkapauthorizer/tests/strategies.py
+++ b/src/_zkapauthorizer/tests/strategies.py
@@ -319,9 +319,7 @@ def sharenum_sets():
     return sets(
         sharenums(),
         min_size=1,
-        # This could go as high as 255 but to avoid tripping over the limits
-        # discussed in sizes(), keep it smaller.
-        max_size=8,
+        max_size=256,
     )
 
 
@@ -335,7 +333,7 @@ def sizes():
         # For the moment there are some assumptions in the test suite that
         # limit us to an amount of storage that can be paid for with one ZKAP.
         # That will be fixed eventually.  For now, keep the sizes pretty low.
-        max_value=2 ** 8,
+        max_value=2 ** 16,
     )
 
 
@@ -346,7 +344,7 @@ def offsets():
     return integers(
         min_value=0,
         # Just for practical purposes...
-        max_value=2 ** 8,
+        max_value=2 ** 16,
     )
 
 
diff --git a/src/_zkapauthorizer/tests/test_storage_protocol.py b/src/_zkapauthorizer/tests/test_storage_protocol.py
index d0fe8bb05ef9efb366a25ac88073f2999f83ba97..9789d50a63e7c359de9424e42b70060bd35fd10d 100644
--- a/src/_zkapauthorizer/tests/test_storage_protocol.py
+++ b/src/_zkapauthorizer/tests/test_storage_protocol.py
@@ -89,6 +89,9 @@ from .matchers import (
 from .fixtures import (
     AnonymousStorageServer,
 )
+from .storage_common import (
+    cleanup_storage_server,
+)
 from ..api import (
     ZKAPAuthorizerStorageServer,
     ZKAPAuthorizerStorageClient,
@@ -139,23 +142,6 @@ class LocalRemote(object):
         )
 
 
-def assume_one_pass(test_and_write_vectors_for_shares):
-    """
-    Assume that the writes represented by the given ``TestAndWriteVectors``
-    will cost at most one pass.
-    """
-    from .._storage_server import (
-        BYTES_PER_PASS,
-        get_sharenums,
-        get_allocated_size,
-        required_passes,
-    )
-    tw_vectors = {k: v.for_call() for (k, v) in test_and_write_vectors_for_shares.items()}
-    sharenums = get_sharenums(tw_vectors)
-    allocated_size = get_allocated_size(tw_vectors)
-    assume(required_passes(BYTES_PER_PASS, sharenums, allocated_size) <= 1)
-
-
 class ShareTests(TestCase):
     """
     Tests for interaction with shares.
@@ -216,9 +202,6 @@ class ShareTests(TestCase):
         resulting buckets can be read back using *get_buckets* and methods of
         those resulting buckets.
         """
-        # XXX
-        assume(len(sharenums) * size < 128 * 1024 * 10)
-
         # Hypothesis causes our storage server to be used many times.  Clean
         # up between iterations.
         cleanup_storage_server(self.anonymous_storage_server)
@@ -406,9 +389,6 @@ class ShareTests(TestCase):
         Mutable share data written using *slot_testv_and_readv_and_writev* can be
         read back as-written and without spending any more passes.
         """
-        # XXX
-        assume_one_pass(test_and_write_vectors_for_shares)
-
         # Hypothesis causes our storage server to be used many times.  Clean
         # up between iterations.
         cleanup_storage_server(self.anonymous_storage_server)
@@ -459,9 +439,6 @@ class ShareTests(TestCase):
         *slot_testv_and_readv_and_writev* any leases on the corresponding slot
         remain the same.
         """
-        # XXX
-        assume_one_pass(test_and_write_vectors_for_shares)
-
         # Hypothesis causes our storage server to be used many times.  Clean
         # up between iterations.
         cleanup_storage_server(self.anonymous_storage_server)
@@ -682,20 +659,3 @@ def write_toy_shares(
     for (sharenum, writer) in allocated.items():
         writer.remote_write(0, bytes_for_share(sharenum, size))
         writer.remote_close()
-
-
-def cleanup_storage_server(storage_server):
-    """
-    Delete all of the shares held by the given storage server.
-
-    :param allmydata.storage.server.StorageServer storage_server: The storage
-        server with some on-disk shares to delete.
-    """
-    starts = [
-        FilePath(storage_server.sharedir),
-        FilePath(storage_server.corruption_advisory_dir),
-    ]
-    for start in starts:
-        for p in start.walk():
-            if p is not start:
-                p.remove()
diff --git a/src/_zkapauthorizer/tests/test_storage_server.py b/src/_zkapauthorizer/tests/test_storage_server.py
index 1ef1186984c36bf5b29bfb00c4fac34bf14a6508..ebd26eb16971a14699f504d586178525830a1630 100644
--- a/src/_zkapauthorizer/tests/test_storage_server.py
+++ b/src/_zkapauthorizer/tests/test_storage_server.py
@@ -57,10 +57,14 @@ from .strategies import (
     write_enabler_secrets,
     lease_renew_secrets,
     lease_cancel_secrets,
+    test_and_write_vectors_for_shares,
 )
 from .fixtures import (
     AnonymousStorageServer,
 )
+from .storage_common import (
+    cleanup_storage_server,
+)
 from ..api import (
     ZKAPAuthorizerStorageServer,
     MorePassesRequired,
@@ -68,6 +72,12 @@ from ..api import (
 from ..storage_common import (
     BYTES_PER_PASS,
     allocate_buckets_message,
+    slot_testv_and_readv_and_writev_message,
+    required_passes,
+    get_sharenums,
+    get_allocated_size,
+    get_implied_data_length,
+
 )
 
 
@@ -107,13 +117,16 @@ class PassValidationTests(TestCase):
             ),
         )
 
-
     def test_allocate_buckets_fails_without_enough_passes(self):
         """
         ``remote_allocate_buckets`` fails with ``MorePassesRequired`` if it is
         passed fewer passes than it requires for the amount of data to be
         stored.
         """
+        # Hypothesis causes our storage server to be used many times.  Clean
+        # up between iterations.
+        cleanup_storage_server(self.anonymous_storage_server)
+
         required_passes = 2
         share_nums = {3, 7}
         allocated_size = int((required_passes * BYTES_PER_PASS) / len(share_nums))
@@ -156,8 +169,12 @@ class PassValidationTests(TestCase):
         """
         If ``remote_slot_testv_and_readv_and_writev`` is invoked to perform
         initial writes on shares without supplying passes, the operation fails
-        with ``LeaseRenewalRequired``.
+        with ``MorePassesRequired``.
         """
+        # Hypothesis causes our storage server to be used many times.  Clean
+        # up between iterations.
+        cleanup_storage_server(self.anonymous_storage_server)
+
         data = b"01234567"
         offset = 0
         sharenum = 0
@@ -183,7 +200,143 @@ class PassValidationTests(TestCase):
                 Equals(1),
             )
         else:
-            self.fail("expected LeaseRenewalRequired, got {}".format(result))
+            self.fail("expected MorePassesRequired, got {}".format(result))
+
+
+    def _test_extend_mutable_fails_without_passes(
+            self,
+            storage_index,
+            secrets,
+            test_and_write_vectors_for_shares,
+            make_data_vector,
+    ):
+        """
+        Verify that increasing the storage requirements of a slot without
+        supplying more passes fails.
+
+        :param make_data_vector: A one-argument callable.  It will be called
+            with the current length of a slot share.  It should return a write
+            vector which will increase the storage requirements of that slot
+            share by at least BYTES_PER_PASS.
+        """
+        # hypothesis causes our storage server to be used many times.  Clean
+        # up between iterations.
+        cleanup_storage_server(self.anonymous_storage_server)
+
+        tw_vectors = {
+            k: v.for_call()
+            for (k, v)
+            in test_and_write_vectors_for_shares.items()
+        }
+        sharenums = get_sharenums(tw_vectors)
+        allocated_size = get_allocated_size(tw_vectors)
+        valid_passes = make_passes(
+            self.signing_key,
+            slot_testv_and_readv_and_writev_message(storage_index),
+            list(
+                RandomToken.create()
+                for i
+                in range(required_passes(BYTES_PER_PASS, sharenums, allocated_size))
+            ),
+        )
+
+        # Create an initial share to toy with.
+        test, read = self.storage_server.doRemoteCall(
+            "slot_testv_and_readv_and_writev",
+            (),
+            dict(
+                passes=valid_passes,
+                storage_index=storage_index,
+                secrets=secrets,
+                tw_vectors=tw_vectors,
+                r_vector=[],
+            ),
+        )
+        self.assertThat(
+            test,
+            Equals(True),
+            "Server denied initial write.",
+        )
+
+        # Try to grow one of the shares by BYTES_PER_PASS which should cost 1
+        # pass.
+        sharenum = sorted(tw_vectors.keys())[0]
+        _, data_vector, new_length = tw_vectors[sharenum]
+        current_length = get_implied_data_length(data_vector, new_length)
+
+        do_extend = lambda: self.storage_server.doRemoteCall(
+            "slot_testv_and_readv_and_writev",
+            (),
+            dict(
+                passes=[],
+                storage_index=storage_index,
+                secrets=secrets,
+                tw_vectors={
+                    sharenum: make_data_vector(current_length),
+                },
+                r_vector=[],
+            ),
+        )
+
+        try:
+            result = do_extend()
+        except MorePassesRequired as e:
+            self.assertThat(
+                e.required_count,
+                Equals(1),
+            )
+        else:
+            self.fail("expected MorePassesRequired, got {}".format(result))
+
+    @given(
+        storage_index=storage_indexes(),
+        secrets=tuples(
+            write_enabler_secrets(),
+            lease_renew_secrets(),
+            lease_cancel_secrets(),
+        ),
+        test_and_write_vectors_for_shares=test_and_write_vectors_for_shares(),
+    )
+    def test_extend_mutable_with_new_length_fails_without_passes(self, storage_index, secrets, test_and_write_vectors_for_shares):
+        """
+        If ``remote_slot_testv_and_readv_and_writev`` is invoked to increase
+        storage usage by supplying a ``new_length`` greater than the current
+        share size and without supplying passes, the operation fails with
+        ``MorePassesRequired``.
+        """
+        return self._test_extend_mutable_fails_without_passes(
+            storage_index,
+            secrets,
+            test_and_write_vectors_for_shares,
+            lambda current_length: (
+                [],
+                [],
+                current_length + BYTES_PER_PASS,
+            ),
+        )
 
-    # TODO
-    # a write that increases the storage cost of the share requires passes too
+    @given(
+        storage_index=storage_indexes(),
+        secrets=tuples(
+            write_enabler_secrets(),
+            lease_renew_secrets(),
+            lease_cancel_secrets(),
+        ),
+        test_and_write_vectors_for_shares=test_and_write_vectors_for_shares(),
+    )
+    def test_extend_mutable_with_write_fails_without_passes(self, storage_index, secrets, test_and_write_vectors_for_shares):
+        """
+        If ``remote_slot_testv_and_readv_and_writev`` is invoked to increase
+        storage usage by performing a write past the end of a share without
+        supplying passes, the operation fails with ``MorePassesRequired``.
+        """
+        return self._test_extend_mutable_fails_without_passes(
+            storage_index,
+            secrets,
+            test_and_write_vectors_for_shares,
+            lambda current_length: (
+                [],
+                [(current_length, "x" * BYTES_PER_PASS)],
+                None,
+            ),
+        )
diff --git a/zkapauthorizer.nix b/zkapauthorizer.nix
index ae7453630fde5af7ae11afb728cdd9563f109f8c..6572ed3005acd776bb983aee9dc33488dcbfab0b 100644
--- a/zkapauthorizer.nix
+++ b/zkapauthorizer.nix
@@ -1,7 +1,7 @@
 { buildPythonPackage, sphinx, circleci-cli
 , attrs, zope_interface, twisted, tahoe-lafs, privacypass
 , fixtures, testtools, hypothesis, pyflakes, treq, coverage
-, hypothesisProfile ? null
+, hypothesisProfile ? "default"
 , collectCoverage ? false
 }:
 buildPythonPackage rec {
@@ -34,10 +34,12 @@ buildPythonPackage rec {
     treq
   ];
 
+
+
   checkPhase = ''
     runHook preCheck
     "${pyflakes}/bin/pyflakes" src/_zkapauthorizer
-    python -m ${if collectCoverage
+    ZKAPAUTHORIZER_HYPOTHESIS_PROFILE=${hypothesisProfile} python -m ${if collectCoverage
       then "coverage run --branch --source _zkapauthorizer,twisted.plugins.zkapauthorizer --module"
       else ""
     } twisted.trial _zkapauthorizer