diff --git a/src/_zkapauthorizer/_storage_server.py b/src/_zkapauthorizer/_storage_server.py
index d1dc66882fd80c4bda6c4d23a7e1b46baec51f47..4bbb74e627185d0ff681bf55c1a0179e96abeb10 100644
--- a/src/_zkapauthorizer/_storage_server.py
+++ b/src/_zkapauthorizer/_storage_server.py
@@ -34,11 +34,14 @@ import attr
 from allmydata.interfaces import RIStorageServer
 from allmydata.storage.common import storage_index_to_dir
 from allmydata.util.base32 import b2a
+from allmydata.storage.shares import get_share_file
+from allmydata.storage.lease import LeaseInfo
 from attr.validators import instance_of, provides
 from challenge_bypass_ristretto import SigningKey, TokenPreimage, VerificationSignature
 from eliot import start_action
 from foolscap.api import Referenceable
 from foolscap.ipb import IReferenceable, IRemotelyCallable
+from twisted.python.filepath import FilePath
 from twisted.internet.defer import Deferred
 from twisted.internet.interfaces import IReactorTime
 from twisted.python.reflect import namedAny
@@ -50,7 +53,6 @@ from .storage_common import (
     add_lease_message,
     allocate_buckets_message,
     get_required_new_passes_for_mutable_write,
-    has_writes,
     pass_value_attribute,
     required_passes,
     slot_testv_and_readv_and_writev_message,
@@ -177,7 +179,17 @@ class ZKAPAuthorizerStorageServer(Referenceable):
     )
 
     def __attrs_post_init__(self):
-        self._original.implicit_lease_renewal = False
+        # Avoid the default StorageServer ``allocate_buckets`` behavior of
+        # renewing leases on all existing shares in the same bucket.  It will
+        # still add leases to the newly uploaded shares.
+        self._original.set_implicit_bucket_lease_renewal(False)
+
+        # Similarly, wrapped ``slot_testv_and_readv_and_writev_message``
+        # renews leases on all shares that are being modified.  Turn that
+        # behavior off.  This means we have to take responsibility for
+        # creating the initial lease on shares when they are created (and we
+        # do in our wrapper for ``slot_testv_and_readv_and_writev_message``).
+        self._original.set_implicit_slot_lease_renewal(False)
 
     def remote_get_version(self):
         """
@@ -208,7 +220,7 @@ class ZKAPAuthorizerStorageServer(Referenceable):
 
         # Note: The *allocate_buckets* protocol allows for some shares to
         # already exist on the server.  When this is the case, the cost of the
-        # operation is based only on the buckets which are really allocated
+        # operation is based only on the shares which are really allocated
         # here.  It's not clear if we can allow the client to supply the
         # reduced number of passes in the call but we can be sure to only mark
         # as spent enough passes to cover the allocated buckets.  The return
@@ -298,12 +310,25 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         r_vector,
     ):
         """
-        Pass-through after a pass check to ensure clients can only allocate
-        storage for mutable shares if they present valid passes.
+        Perform a test-and-set on a number of shares in a given slot.
 
-        :note: This method can be used both to allocate storage and to rewrite
-            data in already-allocated storage.  These cases may not be the
-            same from the perspective of pass validation.
+        Optionally, also read some data to be returned before writing any
+        changes.
+
+        If storage-time will be allocated by the operation then validate the
+        given passes and ensure they are of sufficient quantity to pay for the
+        allocation.
+
+        Specifically, passes are required in the following cases:
+
+        * If shares are created then a lease is added to them.
+          Passes are required for the full size of the share.
+
+        * If shares without unexpired leases are written then a lease is added to them.
+          Passes are required for the full size of the shares after the write.
+
+        * If shares with unexpired leases are made larger.
+          Passes are required for the difference in price between the old and new size.
         """
         with start_action(
             action_type=u"zkapauthorizer:storage-server:remote:slot-testv-and-readv-and-writev",
@@ -329,56 +354,46 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         tw_vectors,
         r_vector,
     ):
-        # Only writes to shares without an active lease will result in a lease
-        # renewal.
-        renew_leases = False
-
-        if has_writes(tw_vectors):
-            # Passes may be supplied with the write to create the
-            # necessary lease as part of the same operation.  This must be
-            # supported because there is no separate protocol action to
-            # *create* a slot.  Clients just begin writing to it.
-            validation = _ValidationResult.validate_passes(
-                slot_testv_and_readv_and_writev_message(storage_index),
-                passes,
-                self._signing_key,
-            )
-            if has_active_lease(self._original, storage_index, self._clock.seconds()):
-                # Some of the storage is paid for already.
-                current_sizes = dict(
-                    get_share_sizes(
-                        self._original,
-                        storage_index,
-                        tw_vectors.keys(),
-                    )
-                )
-                # print("has writes, has active lease, current sizes: {}".format(current_sizes))
-            else:
-                # None of it is.
-                current_sizes = {}
-                renew_leases = True
+        # Get a stable time to use for all lease expiration checks that are
+        # part of this call.
+        now = self._clock.seconds()
 
-            required_new_passes = get_required_new_passes_for_mutable_write(
-                self._pass_value,
-                current_sizes,
-                tw_vectors,
-            )
-            if required_new_passes > len(validation.valid):
-                validation.raise_for(required_new_passes)
-
-        # Skip over the remotely exposed method and jump to the underlying
-        # implementation which accepts one additional parameter that we know
-        # about (and don't expose over the network): renew_leases.  We always
-        # pass False for this because we want to manage leases completely
-        # separately from writes.
-        return self._original.slot_testv_and_readv_and_writev(
+        # Check passes for cryptographic validity.
+        validation = _ValidationResult.validate_passes(
+            slot_testv_and_readv_and_writev_message(storage_index),
+            passes,
+            self._signing_key,
+        )
+
+        # Check number of passes to see if they pay for the operation.
+        required_new_passes = get_writev_price(
+            self._original,
+            self._pass_value,
+            storage_index,
+            tw_vectors,
+            now,
+        )
+
+        # Fail the operation right now if there aren't enough valid passes for
+        # the operation.
+        if required_new_passes > len(validation.valid):
+            validation.raise_for(required_new_passes)
+
+        # Perform the operation.
+        result = self._original.remote_slot_testv_and_readv_and_writev(
             storage_index,
             secrets,
             tw_vectors,
             r_vector,
-            renew_leases=renew_leases,
         )
 
+        # Add the lease that we charged the client for - leases on any written
+        # shares without an unexpired least.
+        add_leases_for_writev(self._original, storage_index, secrets, tw_vectors, now)
+
+        # Propagate the result of the operation.
+        return result
+
     def remote_slot_readv(self, *a, **kw):
         """
         Pass-through without a pass check to let clients read mutable shares as
@@ -387,6 +402,86 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         return self._original.remote_slot_readv(*a, **kw)
 
 
+def add_leases_for_writev(storage_server, storage_index, secrets, tw_vectors, now):
+    """
+    Add a new lease using the given secrets to all shares written by
+    ``tw_vectors``.
+    """
+    for (sharenum, sharepath) in get_all_share_paths(storage_server, storage_index):
+        testv, datav, new_length = tw_vectors.get(sharenum, (None, b"", None))
+        if datav or (new_length is not None):
+            # It has data or a new length - it is a write.
+            if share_has_active_leases(storage_server, storage_index, sharenum, now):
+                # It's fine, leave it be.
+                continue
+
+            # Aha.  It has no lease that hasn't expired.  Give it one.
+            (write_enabler, renew_secret, cancel_secret) = secrets
+            share = get_share_file(sharepath)
+            share.add_or_renew_lease(
+                LeaseInfo(
+                    owner_num=1,
+                    renew_secret=renew_secret,
+                    cancel_secret=cancel_secret,
+                    expiration_time=now + 60 * 60 * 24 * 31,
+                    nodeid=storage_server.my_nodeid,
+                ),
+            )
+
+
+def get_share_path(storage_server, storage_index, sharenum):
+    return (
+        FilePath(storage_server.sharedir)
+        .preauthChild(storage_index_to_dir(storage_index))
+        .child(u"{}".format(sharenum))
+        .path
+    )
+
+
+def share_has_active_leases(storage_server, storage_index, sharenum, now):
+    sharepath = get_share_path(storage_server, storage_index, sharenum)
+    share = get_share_file(sharepath)
+    return any(lease.get_expiration_time() > now for lease in share.get_leases())
+
+
+def get_writev_price(storage_server, pass_value, storage_index, tw_vectors, now):
+    """
+    Determine the price to execute the given test/write vectors.
+    """
+    # Find the current size of shares being written.
+    current_sizes = dict(
+        get_share_sizes(
+            storage_server,
+            storage_index,
+            tw_vectors.keys(),
+        ),
+    )
+
+    # Zero out the size of any share without an unexpired lease.  We will
+    # renew the lease on this share along with the write but the client
+    # must supply the necessary passes to do so.
+    current_sizes.update(
+        {
+            sharenum: 0
+            for sharenum in current_sizes
+            if not share_has_active_leases(
+                storage_server,
+                storage_index,
+                sharenum,
+                now,
+            )
+        }
+    )
+
+    # Compute the number of passes required to execute the given writev
+    # against these existing shares.
+    return get_required_new_passes_for_mutable_write(
+        pass_value,
+        current_sizes,
+        tw_vectors,
+    )
+
+
 def has_active_lease(storage_server, storage_index, now):
     """
     :param allmydata.storage.server.StorageServer storage_server: A storage
diff --git a/src/_zkapauthorizer/tests/foolscap.py b/src/_zkapauthorizer/tests/foolscap.py
index 3a984bea163fd4c567812556f8229508c0cb8a2d..9cef79e99c4cdc687f55ff8d627a646f5036a508 100644
--- a/src/_zkapauthorizer/tests/foolscap.py
+++ b/src/_zkapauthorizer/tests/foolscap.py
@@ -37,7 +37,11 @@ class RIEcho(RemoteInterface):
 
 @implementer(RIStorageServer)
 class StubStorageServer(object):
-    pass
+    def set_implicit_bucket_lease_renewal(self, enabled):
+        pass
+
+    def set_implicit_slot_lease_renewal(self, enabled):
+        pass
 
 
 def get_anonymous_storage_server():
diff --git a/src/_zkapauthorizer/tests/test_storage_protocol.py b/src/_zkapauthorizer/tests/test_storage_protocol.py
index 5ce1770365452677995015641aa16551975b37a1..a4e4136066b103356f319a1e73c0d2469a58affe 100644
--- a/src/_zkapauthorizer/tests/test_storage_protocol.py
+++ b/src/_zkapauthorizer/tests/test_storage_protocol.py
@@ -862,6 +862,7 @@ class ShareTests(TestCase):
         *slot_testv_and_readv_and_writev* any leases on the corresponding slot
         remain the same.
         """
+
         def leases():
             return list(
                 lease.to_mutable_data()