diff --git a/docs/source/designs/costs.rst b/docs/source/designs/costs.rst
new file mode 100644
index 0000000000000000000000000000000000000000..0a98df368167fd5aab9b6e2b1a3f6f12fb0e9106
--- /dev/null
+++ b/docs/source/designs/costs.rst
@@ -0,0 +1,104 @@
+Costs
+=====
+
+ZKAPAuthorizer defines costs for certain Tahoe-LAFS storage operations.
+It overlays its own protocol on the Tahoe-LAFS storage protocol which accepts ZKAPs as payments along with these operations.
+The underlying storage operations are only allowed when the supplied payment covers the cost.
+
+Storage-Time
+============
+
+Storage servers incur a storage cost over time to provide service to storage clients.
+A storage server must hold ciphertext from the time it is uploaded until the last time a client needs to download it.
+
+The unit of cost ZKAPAuthorizer imposes is storage × time.
+The currency used by ZKAPAuthorizer is a (Z)ero (K)nowledge (A)ccess (P)ass -- a ZKAP.
+If a ZKAP is worth 1 MB × 1 month (configurable per-grid) then a client must spend 1 ZKAP to store up to 1 MB for up to 1 month.
+To store up to 1 MB for up to 2 months a client spends 2 ZKAPs.
+To store up to 2 MB for up to 1 month a client spends 2 ZKAPs.
+
+A ZKAP is the smallest unit of the currency.
+When sizes or times do not fall on integer multiples of 1 MB or 1 month the cost is rounded up.
+
+Leases
+------
+
+The period of time a Tahoe-LAFS storage server promises to retain a share is controlled by "leases".
+A lease has an expiration time after which it is no longer effective.
+A lease is associated with a single share.
+As long as at least one lease has not expired a storage server will keep that share.
+Clients are required to periodically "renew" leases for shares they wish the server to keep.
+
+The length of a lease (1 month) provides the "time" component of storage-time.
+
+Here are some examples:
+
+* renewing the lease on a 100 KB share costs 1 ZKAP
+* renewing the lease on a 1 MB share costs 1 ZKAP
+* renewing the lease on a 1.5 MB share costs 2 ZKAPs
+* renewing the lease on a 10 MB share costs 10 ZKAPs
+
+Renewing a lease sets the expiration time to be 1 month after the time of the operation.
+
+Shares
+------
+
+Tahoe-LAFS storage servers accept "shares" for storage.
+Immutable data is represented as shares in "buckets".
+Mutable data is represented as shares in "slots".
+All shares in the same bucket (or slot) relate to the same "file".
+
+The size of a share provides the "storage" component of storage-time.
+
+Immutable Data
+~~~~~~~~~~~~~~
+
+The original Tahoe-LAFS storage protocol automatically adds a lease to all immutable shares it receives at the time the upload completes.
+It also automatically renews leases on all shares in the same bucket as the newly uploaded share.
+
+When ZKAPAuthorizer is used newly uploaded immutable shares still have a lease added to them.
+The behavior of renewing leases on all other shares in the same bucket is disabled.
+
+The cost of uploading an immutable share is the size of the share times the duration of a lease.
+Here are some examples:
+
+* a 100 KB share costs 1 ZKAP to upload
+* a 1 MB share costs 1 ZKAP to upload
+* a 1.5 MB share costs 2 ZKAPs to upload
+* a 10 MB share costs 10 ZKAPs to upload
+
+Mutable Data
+~~~~~~~~~~~~
+
+The original Tahoe-LAFS storage protocol automatically renews leases on mutable shares when they are first created and whenever they are changed.
+
+When ZKAPAuthorizer is used newly uploaded mutable shares still have a lease added to them.
+The behavior of renewing leases on all changed shares is disabled.
+
+The cost of creating a mutable share is the size of the share times the duration of a lease.
+This is exactly the same method as is used to compute the cost of uploading an immutable share.
+
+The cost of modifying a mutable share is based on the change in size that results:
+the cost of the share before the change is subtracted from the cost of the share after the change.
+If the cost is negative it is considered to be zero.
+
+Here are some examples:
+
+* creating a 100 KB share costs 1 ZKAP
+* extending a 100 KB share to 200 KB is free
+* extending a 1 MB share to 1.5 MB costs 1 ZKAP
+* extending a 1.5 MB share to 2 MB is free
+* extending a 2 MB share to 10 MB costs 8 ZKAPs
+* truncating a 10 MB share to 2 MB is free
+* rewriting the contents of a 5 MB share without changing its length is free
+
+Note that leases are *not* renewed when a mutable share is modified.
+When the modification has a positive cost this results in the client being overcharged.
+The amount of the overcharge is a function of three variables:
+
+* The **lease period** currently fixed at 31 days.
+* The **remaining lease time** which is the difference between the time when the current lease expires and the time of the operation.
+* The **price increase** which is the number of ZKAPs the modification costs.
+
+The amount of the overcharge is **lease period remaining** / **lease period** × **price increase**.
+See <https://github.com/PrivateStorageio/ZKAPAuthorizer/issues/254> for efforts to remedy this.
diff --git a/src/_zkapauthorizer/_storage_client.py b/src/_zkapauthorizer/_storage_client.py
index 3ae35903b71b581052237858f01adccac117355e..622299f6bd171f8905433aa4b48550e643efdca4 100644
--- a/src/_zkapauthorizer/_storage_client.py
+++ b/src/_zkapauthorizer/_storage_client.py
@@ -26,6 +26,7 @@ from functools import partial, wraps
 
 import attr
 from allmydata.interfaces import IStorageServer
+from allmydata.util.eliotutil import log_call_deferred
 from attr.validators import provides
 from eliot.twisted import inline_callbacks
 from twisted.internet.defer import returnValue
@@ -39,7 +40,7 @@ from .storage_common import (
     add_lease_message,
     allocate_buckets_message,
     get_required_new_passes_for_mutable_write,
-    has_writes,
+    get_write_sharenums,
     pass_value_attribute,
     required_passes,
     slot_testv_and_readv_and_writev_message,
@@ -411,6 +412,7 @@ class ZKAPAuthorizerStorageClient(object):
             reason,
         )
 
+    @log_call_deferred("zkapauthorizer:storage-client:slot_testv_and_readv_and_writev")
     @inline_callbacks
     @with_rref
     def slot_testv_and_readv_and_writev(
@@ -441,7 +443,8 @@ class ZKAPAuthorizerStorageClient(object):
             ) in tw_vectors.items()
         }
 
-        if has_writes(tw_vectors):
+        write_sharenums = get_write_sharenums(tw_vectors)
+        if len(write_sharenums) > 0:
             # When performing writes, if we're increasing the storage
             # requirement, we need to spend more passes.  Unfortunately we
             # don't know what the current storage requirements are at this
@@ -463,6 +466,9 @@ class ZKAPAuthorizerStorageClient(object):
                 sharenum: stat.size
                 for (sharenum, stat) in stats.items()
                 if stat.lease_expiration > now
+                # Also, the size of any share we're not writing to doesn't
+                # matter.
+                and sharenum in write_sharenums
             }
             # Determine the cost of the new storage for the operation.
             num_passes = get_required_new_passes_for_mutable_write(
diff --git a/src/_zkapauthorizer/_storage_server.py b/src/_zkapauthorizer/_storage_server.py
index e379989c36f4ae1054da88791e8604a9c1fc92f6..e54f3780787bf500937188b512411cc3b027e074 100644
--- a/src/_zkapauthorizer/_storage_server.py
+++ b/src/_zkapauthorizer/_storage_server.py
@@ -34,15 +34,17 @@ import attr
 from allmydata.interfaces import RIStorageServer
 from allmydata.storage.common import storage_index_to_dir
 from allmydata.storage.immutable import ShareFile
+from allmydata.storage.lease import LeaseInfo
 from allmydata.storage.mutable import MutableShareFile
 from allmydata.storage.shares import get_share_file
 from allmydata.util.base32 import b2a
 from attr.validators import instance_of, provides
 from challenge_bypass_ristretto import SigningKey, TokenPreimage, VerificationSignature
-from eliot import start_action
+from eliot import log_call, start_action
 from foolscap.api import Referenceable
 from twisted.internet.defer import Deferred
 from twisted.internet.interfaces import IReactorTime
+from twisted.python.filepath import FilePath
 from twisted.python.reflect import namedAny
 from zope.interface import implementer
 
@@ -52,7 +54,7 @@ from .storage_common import (
     add_lease_message,
     allocate_buckets_message,
     get_required_new_passes_for_mutable_write,
-    has_writes,
+    get_write_sharenums,
     pass_value_attribute,
     required_passes,
     slot_testv_and_readv_and_writev_message,
@@ -292,7 +294,7 @@ class ZKAPAuthorizerStorageServer(Referenceable):
     def remote_stat_shares(self, storage_indexes_or_slots):
         # type: (List[bytes]) -> List[Dict[int, ShareStat]]
         return list(
-            dict(stat_share(self._original, storage_index_or_slot))
+            dict(get_share_stats(self._original, storage_index_or_slot, None))
             for storage_index_or_slot in storage_indexes_or_slots
         )
 
@@ -305,12 +307,26 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         r_vector,
     ):
         """
-        Pass-through after a pass check to ensure clients can only allocate
-        storage for mutable shares if they present valid passes.
+        Perform a test-and-set on a number of shares in a given slot.
 
-        :note: This method can be used both to allocate storage and to rewrite
-            data in already-allocated storage.  These cases may not be the
-            same from the perspective of pass validation.
+        Optionally, also read some data to be returned before writing any
+        changes.
+
+        If storage-time will be allocated by the operation then validate the
+        given passes and ensure they are of sufficient quantity to pay for the
+        allocation.
+
+        Specifically, passes are required in the following cases:
+
+        * If shares are created then a lease is added to them.
+          Passes are required for the full size of the share.
+
+        * If shares without unexpired leases are written then a lease is added to them.
+          Passes are required for the full size of the shares after the write.
+
+        * If shares with unexpired leases are made larger.
+          Passes are required for the difference in price between the old and new size.
+          Note that the lease is *not* renewed in this case (see #254).
         """
         with start_action(
             action_type=u"zkapauthorizer:storage-server:remote:slot-testv-and-readv-and-writev",
@@ -336,54 +352,59 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         tw_vectors,
         r_vector,
     ):
-        # Only writes to shares without an active lease will result in a lease
-        # renewal.
-        renew_leases = False
-
-        if has_writes(tw_vectors):
-            # Passes may be supplied with the write to create the
-            # necessary lease as part of the same operation.  This must be
-            # supported because there is no separate protocol action to
-            # *create* a slot.  Clients just begin writing to it.
-            validation = _ValidationResult.validate_passes(
-                slot_testv_and_readv_and_writev_message(storage_index),
-                passes,
-                self._signing_key,
-            )
-            if has_active_lease(self._original, storage_index, self._clock.seconds()):
-                # Some of the storage is paid for already.
-                current_sizes = dict(
-                    get_share_sizes(
-                        self._original,
-                        storage_index,
-                        tw_vectors.keys(),
-                    )
-                )
-                # print("has writes, has active lease, current sizes: {}".format(current_sizes))
-            else:
-                # None of it is.
-                current_sizes = {}
-                renew_leases = True
+        # Get a stable time to use for all lease expiration checks that are
+        # part of this call.
+        now = self._clock.seconds()
 
-            required_new_passes = get_required_new_passes_for_mutable_write(
-                self._pass_value,
-                current_sizes,
-                tw_vectors,
-            )
-            if required_new_passes > len(validation.valid):
-                validation.raise_for(required_new_passes)
+        # Check passes for cryptographic validity.
+        validation = _ValidationResult.validate_passes(
+            slot_testv_and_readv_and_writev_message(storage_index),
+            passes,
+            self._signing_key,
+        )
 
-        # Skip over the remotely exposed method and jump to the underlying
-        # implementation which accepts one additional parameter that we know
-        # about (and don't expose over the network): renew_leases.
-        return self._original.slot_testv_and_readv_and_writev(
+        # Inspect the operation to determine its price based on any
+        # allocations.
+        required_new_passes = get_writev_price(
+            self._original,
+            self._pass_value,
+            storage_index,
+            tw_vectors,
+            now,
+        )
+
+        # Fail the operation right now if there aren't enough valid passes to
+        # cover the price.
+        if required_new_passes > len(validation.valid):
+            validation.raise_for(required_new_passes)
+
+        # Perform the operation.
+        result = self._original.slot_testv_and_readv_and_writev(
             storage_index,
             secrets,
             tw_vectors,
             r_vector,
-            renew_leases=renew_leases,
+            # Disable all lease renewal logic from the wrapped storage server.
+            # We'll add or renew leases based on our billing model.
+            renew_leases=False,
         )
 
+        # Add the leases that we charged the client for.  This includes:
+        #
+        #  - leases on newly created shares
+        #
+        #  - leases on existing, modified shares without an unexpired lease
+        #
+        # Note it does not include existing shares that grew enough to be more
+        # expensive.  The operation was required to pay the full price
+        # difference but this only grants storage for the remainder of the
+        # existing lease period.  This results in the client being overcharged
+        # somewhat.
+        add_leases_for_writev(self._original, storage_index, secrets, tw_vectors, now)
+
+        # Propagate the result of the operation.
+        return result
+
     def remote_slot_readv(self, *a, **kw):
         """
         Pass-through without a pass check to let clients read mutable shares as
@@ -392,23 +413,6 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         return self._original.remote_slot_readv(*a, **kw)
 
 
-def has_active_lease(storage_server, storage_index, now):
-    """
-    :param allmydata.storage.server.StorageServer storage_server: A storage
-        server to use to look up lease information.
-
-    :param bytes storage_index: A storage index to use to look up lease
-        information.
-
-    :param float now: The current time as a POSIX timestamp.
-
-    :return bool: ``True`` if any only if the given storage index has a lease
-        with an expiration time after ``now``.
-    """
-    leases = storage_server.get_slot_leases(storage_index)
-    return any(lease.get_expiration_time() > now for lease in leases)
-
-
 def check_pass_quantity(pass_value, validation, share_sizes):
     """
     Check that the given number of passes is sufficient to cover leases for
@@ -524,6 +528,10 @@ def get_all_share_numbers(storage_server, storage_index):
         yield sharenum
 
 
+@log_call(
+    action_type="zkapauthorizer:storage-server:get-share-sizes",
+    include_args=["storage_index_or_slot", "sharenums"],
+)
 def get_share_sizes(storage_server, storage_index_or_slot, sharenums):
     """
     Get sizes of the given share numbers for the given storage index *or*
@@ -531,11 +539,10 @@ def get_share_sizes(storage_server, storage_index_or_slot, sharenums):
 
     :see: ``get_share_stats``
 
-    :return: A generator of tuples of (int, int) where the first element is a
-        share number and the second element is the data size for that share
-        number.
+    :return: A list of tuples of (int, int) where the first element is a share
+        number and the second element is the data size for that share number.
     """
-    return (
+    return list(
         (sharenum, stat.size)
         for (sharenum, stat) in get_share_stats(
             storage_server, storage_index_or_slot, sharenums
@@ -677,22 +684,6 @@ def get_slot_share_size(sharepath):
         return share_data_length
 
 
-def stat_share(storage_server, storage_index_or_slot):
-    """
-    Get a ``ShareStat`` for each share in a bucket or a slot.
-
-    :return: An iterator of two-tuples of share number and corresponding
-        ``ShareStat``.
-    """
-    stat = None
-    for sharenum, sharepath in get_all_share_paths(
-        storage_server, storage_index_or_slot
-    ):
-        if stat is None:
-            stat = get_stat(sharepath)
-        yield (sharenum, stat(storage_server, storage_index_or_slot, sharepath))
-
-
 def get_stat(sharepath):
     """
     Get a function that can retrieve the metadata from the share at the given
@@ -707,3 +698,97 @@ def get_stat(sharepath):
             return stat_slot
         else:
             return stat_bucket
+
+
+def add_leases_for_writev(storage_server, storage_index, secrets, tw_vectors, now):
+    """
+    Add a new lease using the given secrets to all shares written by
+    ``tw_vectors``.
+    """
+    for (sharenum, sharepath) in get_all_share_paths(storage_server, storage_index):
+        testv, datav, new_length = tw_vectors.get(sharenum, (None, b"", None))
+        if datav or (new_length is not None):
+            # It has data or a new length - it is a write.
+            if share_has_active_leases(storage_server, storage_index, sharenum, now):
+                # It's fine, leave it be.
+                continue
+
+            # Aha.  It has no lease that hasn't expired.  Give it one.
+            (write_enabler, renew_secret, cancel_secret) = secrets
+            share = get_share_file(sharepath)
+            share.add_or_renew_lease(
+                LeaseInfo(
+                    owner_num=1,
+                    renew_secret=renew_secret,
+                    cancel_secret=cancel_secret,
+                    expiration_time=now
+                    + ZKAPAuthorizerStorageServer.LEASE_PERIOD.total_seconds(),
+                    nodeid=storage_server.my_nodeid,
+                ),
+            )
+
+
+def get_share_path(storage_server, storage_index, sharenum):
+    # type: (StorageServer, bytes, int) -> FilePath
+    """
+    Get the path to the given storage server's storage for the given share.
+    """
+    return (
+        FilePath(storage_server.sharedir)
+        .preauthChild(storage_index_to_dir(storage_index))
+        .child(u"{}".format(sharenum))
+    )
+
+
+def share_has_active_leases(storage_server, storage_index, sharenum, now):
+    # type: (StorageServer, bytes, int, float) -> bool
+    """
+    Determine whether the given share on the given server has an unexpired
+    lease or not.
+
+    :return: ``True`` if it has at least one unexpired lease, ``False``
+        otherwise.
+    """
+    sharepath = get_share_path(storage_server, storage_index, sharenum)
+    share = get_share_file(sharepath.path)
+    return any(lease.get_expiration_time() > now for lease in share.get_leases())
+
+
+def get_writev_price(storage_server, pass_value, storage_index, tw_vectors, now):
+    # type: (StorageServer, int, bytes, TestWriteVectors, float) -> int
+    """
+    Determine the price to execute the given test/write vectors.
+    """
+    # Find the current size of shares being written.
+    current_sizes = dict(
+        get_share_sizes(
+            storage_server,
+            storage_index,
+            # Here's how we restrict the result to only written shares.
+            sharenums=get_write_sharenums(tw_vectors),
+        ),
+    )
+
+    # Zero out the size of any share without an unexpired lease.  We will
+    # renew the lease on this share along with the write but the client
+    # must supply the necessary passes to do so.
+    current_sizes.update(
+        {
+            sharenum: 0
+            for sharenum in current_sizes
+            if not share_has_active_leases(
+                storage_server,
+                storage_index,
+                sharenum,
+                now,
+            )
+        }
+    )
+
+    # Compute the number of passes required to execute the given writev
+    # against these existing shares.
+    return get_required_new_passes_for_mutable_write(
+        pass_value,
+        current_sizes,
+        tw_vectors,
+    )
diff --git a/src/_zkapauthorizer/storage_common.py b/src/_zkapauthorizer/storage_common.py
index 9af2a922fda5ce161a02294438842a3f3f27e5c3..908a6ec755e6499b77ac28a4fbaeb15d1d04d5d8 100644
--- a/src/_zkapauthorizer/storage_common.py
+++ b/src/_zkapauthorizer/storage_common.py
@@ -203,7 +203,7 @@ def has_writes(tw_vectors):
     )
 
 
-def get_sharenums(tw_vectors):
+def get_write_sharenums(tw_vectors):
     """
     :param tw_vectors: See
         ``allmydata.interfaces.TestAndWriteVectorsForShares``.
@@ -211,7 +211,13 @@ def get_sharenums(tw_vectors):
     :return set[int]: The share numbers which the given test/write vectors would write to.
     """
     return set(
-        sharenum for (sharenum, (test, data, new_length)) in tw_vectors.items() if data
+        # This misses cases where `data` is empty but `new_length` is
+        # non-None, non-0.
+        #
+        # Related to #222.
+        sharenum
+        for (sharenum, (test, data, new_length)) in tw_vectors.items()
+        if data
     )
 
 
@@ -270,7 +276,6 @@ def get_required_new_passes_for_mutable_write(pass_value, current_sizes, tw_vect
         if size > new_sizes.get(sharenum, 0):
             new_sizes[sharenum] = size
 
-    new_sizes.update()
     new_passes = required_passes(
         pass_value,
         new_sizes.values(),
@@ -289,14 +294,14 @@ def get_required_new_passes_for_mutable_write(pass_value, current_sizes, tw_vect
 
 def summarize(tw_vectors):
     return {
-        sharenum: (
-            list(
+        sharenum: {
+            "testv": list(
                 (offset, length, operator, len(specimen))
                 for (offset, length, operator, specimen) in test_vector
             ),
-            list((offset, len(data)) for (offset, data) in data_vectors),
-            new_length,
-        )
+            "datav": list((offset, len(data)) for (offset, data) in data_vectors),
+            "new_length": new_length,
+        }
         for (sharenum, (test_vector, data_vectors, new_length)) in tw_vectors.items()
     }
 
diff --git a/src/_zkapauthorizer/tests/strategies.py b/src/_zkapauthorizer/tests/strategies.py
index 567c94405e6aac2009255ead776784e22ab95bad..d14a62dfb110223846b4b8a772ea6e3dfe3a59e2 100644
--- a/src/_zkapauthorizer/tests/strategies.py
+++ b/src/_zkapauthorizer/tests/strategies.py
@@ -809,7 +809,10 @@ def slot_test_and_write_vectors():
         slot_test_vectors(),
         slot_data_vectors(),
         one_of(
+            # The new length might be omitted completely.
             just(None),
+            # Or it might be given as an integer.  Allow a zero size which
+            # means "delete this share" in this context.
             sizes(),
         ),
     )
diff --git a/src/_zkapauthorizer/tests/test_storage_protocol.py b/src/_zkapauthorizer/tests/test_storage_protocol.py
index 32e7e406380eaff0ce140f22b37db0cf394ae0a3..6e3fd1222470ab0959d8f936177c1f1d78ba68d0 100644
--- a/src/_zkapauthorizer/tests/test_storage_protocol.py
+++ b/src/_zkapauthorizer/tests/test_storage_protocol.py
@@ -27,6 +27,7 @@ from hypothesis import assume, given
 from hypothesis.strategies import data as data_strategy
 from hypothesis.strategies import integers, lists, sets, tuples
 from testtools import TestCase
+from testtools.content import text_content
 from testtools.matchers import (
     AfterPreprocessing,
     Always,
@@ -449,20 +450,6 @@ class ShareTests(TestCase):
             ),
         )
 
-        def get_lease_grant_times(storage_server, storage_index):
-            """
-            Get the grant times for all of the leases for all of the shares at the
-            given storage index.
-            """
-            shares = storage_server._get_bucket_shares(storage_index)
-            for sharenum, sharepath in shares:
-                sharefile = get_share_file(sharepath)
-                leases = sharefile.get_leases()
-                grant_times = list(
-                    lease.get_grant_renew_time_time() for lease in leases
-                )
-                yield sharenum, grant_times
-
         expected_leases = {}
         # Chop off the non-integer part of the expected values because share
         # files only keep integer precision.
@@ -849,40 +836,68 @@ class ShareTests(TestCase):
             lease_renew_secrets(),
             lease_cancel_secrets(),
         ),
-        test_and_write_vectors_for_shares=slot_test_and_write_vectors_for_shares(),
+        share_vectors=lists(slot_test_and_write_vectors_for_shares(), min_size=1),
+        now=posix_timestamps(),
     )
-    def test_create_mutable(
-        self, storage_index, secrets, test_and_write_vectors_for_shares
-    ):
+    def test_create_mutable(self, storage_index, secrets, share_vectors, now):
         """
         Mutable share data written using *slot_testv_and_readv_and_writev* can be
         read back as-written and without spending any more passes.
         """
-        wrote, read = extract_result(
-            self.client.slot_testv_and_readv_and_writev(
+        self.clock.advance(now)
+
+        def write(vector):
+            return self.client.slot_testv_and_readv_and_writev(
                 storage_index,
                 secrets=secrets,
-                tw_vectors={
-                    k: v.for_call()
-                    for (k, v) in test_and_write_vectors_for_shares.items()
-                },
+                tw_vectors={k: v.for_call() for (k, v) in vector.items()},
                 r_vector=[],
-            ),
-        )
-        self.assertThat(
-            wrote,
-            Equals(True),
-            u"Server rejected a write to a new mutable slot",
-        )
-        self.assertThat(
-            read,
-            Equals({}),
-            u"Server gave back read results when we asked for none.",
-        )
-        # Now we can read it back without spending any more passes.
+            )
+
+        grant_times = {}
+        for n, vector in enumerate(share_vectors):
+            # Execute one of the write operations.  It might write to multiple
+            # shares.
+            self.assertThat(
+                write(vector),
+                is_successful_write(),
+            )
+
+            # Track our progress through the list of write vectors for
+            # testtools failure reporting.  Each call overwrites the previous
+            # detail so we can see how far we got, if we happen to fail
+            # somewhere in this loop.
+            self.addDetail("writev-progress", text_content("{}".format(n)))
+
+            # Track the simulated time when each lease receives its lease.
+            # This scenario is constructed so that only the first write to any
+            # given share will result in a lease so we do not allow the grant
+            # time for a given share number to be updated here.  Only
+            # sharenums being written for the first time will capture the time
+            # here.
+            grant_times.update(
+                {
+                    # The time is in a list to make it easier to compare the
+                    # result with the return value of `get_lease_grant_times`
+                    # later.  The time is truncated to the integer portion
+                    # because that is how much precision leases keep.
+                    sharenum: [int(self.clock.seconds())]
+                    for sharenum in vector
+                    if sharenum not in grant_times
+                }
+            )
+
+            # Advance time so the grant times will be distinct.
+            self.clock.advance(1)
+
+        # Now we can read back the last data written without spending any more
+        # passes.
         before_passes = len(self.pass_factory.issued)
         assert_read_back_data(
-            self, storage_index, secrets, test_and_write_vectors_for_shares
+            self,
+            storage_index,
+            secrets,
+            share_vectors[-1],
         )
         after_passes = len(self.pass_factory.issued)
         self.assertThat(
@@ -890,6 +905,17 @@ class ShareTests(TestCase):
             Equals(after_passes),
         )
 
+        # And the lease we paid for on every share is present.
+        self.assertThat(
+            dict(
+                get_lease_grant_times(
+                    self.anonymous_storage_server,
+                    storage_index,
+                )
+            ),
+            Equals(grant_times),
+        )
+
     @given(
         storage_index=storage_indexes(),
         secrets=tuples(
@@ -1028,85 +1054,6 @@ class ShareTests(TestCase):
             ),
         )
 
-    @given(
-        storage_index=storage_indexes(),
-        secrets=tuples(
-            write_enabler_secrets(),
-            lease_renew_secrets(),
-            lease_cancel_secrets(),
-        ),
-        test_and_write_vectors_for_shares=slot_test_and_write_vectors_for_shares(),
-    )
-    def test_client_cannot_control_lease_behavior(
-        self, storage_index, secrets, test_and_write_vectors_for_shares
-    ):
-        """
-        If the client passes ``renew_leases`` to *slot_testv_and_readv_and_writev*
-        it fails with ``TypeError``, no lease is updated, and no share data is
-        written.
-        """
-        # First, tell the client to let us violate the protocol.  It is the
-        # server's responsibility to defend against this attack.
-        self.local_remote_server.check_args = False
-
-        # The nice Python API doesn't let you do this so we drop down to
-        # the layer below.  We also use positional arguments because they
-        # transit the network differently from keyword arguments.  Yay.
-        d = self.local_remote_server.callRemote(
-            "slot_testv_and_readv_and_writev",
-            # passes
-            _encode_passes(
-                self.pass_factory.get(
-                    slot_testv_and_readv_and_writev_message(storage_index),
-                    1,
-                ),
-            ),
-            # storage_index
-            storage_index,
-            # secrets
-            secrets,
-            # tw_vectors
-            {k: v.for_call() for (k, v) in test_and_write_vectors_for_shares.items()},
-            # r_vector
-            [],
-            # add_leases
-            True,
-        )
-
-        # The operation should fail.
-        self.expectThat(
-            d,
-            failed(
-                AfterPreprocessing(
-                    lambda f: f.value,
-                    IsInstance(TypeError),
-                ),
-            ),
-        )
-
-        # There should be no shares at the given storage index.
-        d = self.client.slot_readv(
-            storage_index,
-            # Surprise.  shares=None means all shares.
-            shares=None,
-            r_vector=list(
-                list(map(write_vector_to_read_vector, vector.write_vector))
-                for vector in test_and_write_vectors_for_shares.values()
-            ),
-        )
-        self.expectThat(
-            d,
-            succeeded(
-                Equals({}),
-            ),
-        )
-
-        # And there should be no leases on those non-shares.
-        self.expectThat(
-            list(self.anonymous_storage_server.get_slot_leases(storage_index)),
-            Equals([]),
-        )
-
 
 def assert_read_back_data(
     self, storage_index, secrets, test_and_write_vectors_for_shares
@@ -1161,3 +1108,16 @@ def write_vector_to_read_vector(write_vector):
     write vector.
     """
     return (write_vector[0], len(write_vector[1]))
+
+
+def get_lease_grant_times(storage_server, storage_index):
+    """
+    Get the grant times for all of the leases for all of the shares at the
+    given storage index.
+    """
+    shares = storage_server._get_bucket_shares(storage_index)
+    for sharenum, sharepath in shares:
+        sharefile = get_share_file(sharepath)
+        leases = sharefile.get_leases()
+        grant_times = list(lease.get_grant_renew_time_time() for lease in leases)
+        yield sharenum, grant_times