diff --git a/src/_zkapauthorizer/_storage_client.py b/src/_zkapauthorizer/_storage_client.py
index 7eebdc56a6e567715b2909ed0e9c9a2c03660486..eafc1196f95d5a826cc846f079c452b9c91bfd43 100644
--- a/src/_zkapauthorizer/_storage_client.py
+++ b/src/_zkapauthorizer/_storage_client.py
@@ -37,6 +37,7 @@ from .storage_common import (
     add_lease_message,
     renew_lease_message,
     slot_testv_and_readv_and_writev_message,
+    has_writes,
 )
 
 @implementer(IStorageServer)
@@ -170,9 +171,13 @@ class ZKAPAuthorizerStorageClient(object):
             tw_vectors,
             r_vector,
     ):
+        if has_writes(tw_vectors):
+            passes = self._get_encoded_passes(slot_testv_and_readv_and_writev_message(storage_index), 1)
+        else:
+            passes = []
         return self._rref.callRemote(
             "slot_testv_and_readv_and_writev",
-            self._get_encoded_passes(slot_testv_and_readv_and_writev_message(storage_index), 1),
+            passes,
             storage_index,
             secrets,
             tw_vectors,
diff --git a/src/_zkapauthorizer/_storage_server.py b/src/_zkapauthorizer/_storage_server.py
index 8bae23d92c31d59ee4405d3ccfca4c0c09f30d51..b95eb3b3633c5336211b7f4fb9e9f8e865128849 100644
--- a/src/_zkapauthorizer/_storage_server.py
+++ b/src/_zkapauthorizer/_storage_server.py
@@ -24,6 +24,10 @@ from __future__ import (
     absolute_import,
 )
 
+from functools import (
+    partial,
+)
+
 import attr
 from attr.validators import (
     provides,
@@ -48,6 +52,14 @@ from privacypass import (
     VerificationSignature,
     SigningKey,
 )
+
+from twisted.python.reflect import (
+    namedAny,
+)
+from twisted.internet.interfaces import (
+    IReactorTime,
+)
+
 from .foolscap import (
     RIPrivacyPassAuthorizedStorageServer,
 )
@@ -58,6 +70,7 @@ from .storage_common import (
     add_lease_message,
     renew_lease_message,
     slot_testv_and_readv_and_writev_message,
+    has_writes,
 )
 
 class MorePassesRequired(Exception):
@@ -85,6 +98,14 @@ class MorePassesRequired(Exception):
         return repr(self)
 
 
+class LeaseRenewalRequired(Exception):
+    """
+    Mutable write operations fail with ``LeaseRenewalRequired`` when the slot
+    which is the target of the write does not have an active lease and no
+    passes are supplied to create one.
+    """
+
+
 @implementer_only(RIPrivacyPassAuthorizedStorageServer, IReferenceable, IRemotelyCallable)
 # It would be great to use `frozen=True` (value-based hashing) instead of
 # `cmp=False` (identity based hashing) but Referenceable wants to set some
@@ -97,6 +118,10 @@ class ZKAPAuthorizerStorageServer(Referenceable):
     """
     _original = attr.ib(validator=provides(RIStorageServer))
     _signing_key = attr.ib(validator=instance_of(SigningKey))
+    _clock = attr.ib(
+        validator=provides(IReactorTime),
+        default=attr.Factory(partial(namedAny, "twisted.internet.reactor")),
+    )
 
     def _is_invalid_pass(self, message, pass_):
         """
@@ -154,12 +179,7 @@ class ZKAPAuthorizerStorageServer(Referenceable):
             allocate_buckets_message(storage_index),
             passes,
         )
-        required_pass_count = required_passes(BYTES_PER_PASS, sharenums, allocated_size)
-        if len(valid_passes) < required_pass_count:
-            raise MorePassesRequired(
-                len(valid_passes),
-                required_pass_count,
-            )
+        check_pass_quantity_for_write(len(valid_passes), sharenums, allocated_size)
 
         return self._original.remote_allocate_buckets(
             storage_index,
@@ -216,7 +236,26 @@ class ZKAPAuthorizerStorageServer(Referenceable):
             data in already-allocated storage.  These cases may not be the
             same from the perspective of pass validation.
         """
-        self._validate_passes(slot_testv_and_readv_and_writev_message(storage_index), passes)
+        renew_leases = False
+
+        if has_writes(tw_vectors):
+            # Writes are allowed to shares with active leases.
+            if not has_active_lease(
+                self._original,
+                storage_index,
+                self._clock.seconds(),
+            ):
+                # Passes may be supplied with the write to create the
+                # necessary lease as part of the same operation.  This must be
+                # supported because there is no separate protocol action to
+                # *create* a slot.  Clients just begin writing to it.
+                valid_passes = self._validate_passes(
+                    slot_testv_and_readv_and_writev_message(storage_index),
+                    passes,
+                )
+                check_pass_quantity_for_mutable_write(len(valid_passes), tw_vectors)
+                renew_leases = True
+
         # Skip over the remotely exposed method and jump to the underlying
         # implementation which accepts one additional parameter that we know
         # about (and don't expose over the network): renew_leases.  We always
@@ -227,7 +266,7 @@ class ZKAPAuthorizerStorageServer(Referenceable):
             secrets,
             tw_vectors,
             r_vector,
-            renew_leases=False,
+            renew_leases=renew_leases,
         )
 
     def remote_slot_readv(self, *a, **kw):
@@ -237,6 +276,97 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         """
         return self._original.remote_slot_readv(*a, **kw)
 
+
+def get_sharenums(tw_vectors):
+    """
+    :param tw_vectors: See
+        ``allmydata.interfaces.TestAndWriteVectorsForShares``.
+
+    :return set[int]: The share numbers which the given test/write vectors would write to.
+    """
+    return set(
+        sharenum
+        for (sharenum, (test, data, new_length))
+        in tw_vectors.items()
+        if data
+    )
+
+
+def get_allocated_size(tw_vectors):
+    """
+    :param tw_vectors: See
+        ``allmydata.interfaces.TestAndWriteVectorsForShares``.
+
+    :return int: The largest position ``tw_vectors`` writes in any share.
+    """
+    return max(
+        list(
+            max(offset + len(s) for (offset, s) in data)
+            for (sharenum, (test, data, new_length))
+            in tw_vectors.items()
+            if data
+        ),
+    )
+
+
+def has_active_lease(storage_server, storage_index, now):
+    """
+    :param allmydata.storage.server.StorageServer storage_server: A storage
+        server to use to look up lease information.
+
+    :param bytes storage_index: A storage index to use to look up lease
+        information.
+
+    :param float now: The current time as a POSIX timestamp.
+
+    :return bool: ``True`` if any only if the given storage index has a lease
+        with an expiration time after ``now``.
+    """
+    leases = storage_server.get_slot_leases(storage_index)
+    return any(
+        lease.get_expiration_time() > now
+        for lease
+        in leases
+    )
+
+
+def check_pass_quantity_for_write(valid_count, sharenums, allocated_size):
+    """
+    Determine if the given number of valid passes is sufficient for an
+    attempted write.
+
+    :param int valid_count: The number of valid passes to consider.
+    :param set[int] sharenums: The shares being written to.
+    :param int allocated_size: The size of each share.
+
+    :raise MorePassedRequired: If the number of valid passes given is too
+        small.
+
+    :return: ``None`` if the number of valid passes given is sufficient.
+    """
+    required_pass_count = required_passes(BYTES_PER_PASS, sharenums, allocated_size)
+    if valid_count < required_pass_count:
+        raise MorePassesRequired(
+            valid_count,
+            required_pass_count,
+        )
+
+
+def check_pass_quantity_for_mutable_write(valid_count, tw_vectors):
+    """
+    Determine if the given number of valid passes is sufficient for an
+    attempted write to a slot.
+
+    :param int valid_count: The number of valid passes to consider.
+
+    :param tw_vectors: See
+        ``allmydata.interfaces.TestAndWriteVectorsForShares``.
+    """
+    sharenums = get_sharenums(tw_vectors)
+    allocated_size = get_allocated_size(tw_vectors)
+    check_pass_quantity_for_write(valid_count, sharenums, allocated_size)
+
+
 # I don't understand why this is required.
 # ZKAPAuthorizerStorageServer is-a Referenceable.  It seems like
 # the built in adapter should take care of this case.
diff --git a/src/_zkapauthorizer/api.py b/src/_zkapauthorizer/api.py
index 8b89611ba10e3ee3833f2d5c7c45d1d8365ee320..365e39a23026e1b7692267d12c895f45cfaeda64 100644
--- a/src/_zkapauthorizer/api.py
+++ b/src/_zkapauthorizer/api.py
@@ -14,6 +14,7 @@
 
 __all__ = [
     "MorePassesRequired",
+    "LeaseRenewalRequired",
     "ZKAPAuthorizerStorageServer",
     "ZKAPAuthorizerStorageClient",
     "ZKAPAuthorizer",
@@ -21,6 +22,7 @@ __all__ = [
 
 from ._storage_server import (
     MorePassesRequired,
+    LeaseRenewalRequired,
     ZKAPAuthorizerStorageServer,
 )
 from ._storage_client import (
diff --git a/src/_zkapauthorizer/storage_common.py b/src/_zkapauthorizer/storage_common.py
index ef1215d9dceacb3a28de204388e76dc604283ca9..5baacb22be69bb3e3473c3e7b8b22a7994320845 100644
--- a/src/_zkapauthorizer/storage_common.py
+++ b/src/_zkapauthorizer/storage_common.py
@@ -16,6 +16,10 @@
 Functionality shared between the storage client and server.
 """
 
+from __future__ import (
+    division,
+)
+
 from base64 import (
     b64encode,
 )
@@ -61,3 +65,17 @@ def required_passes(bytes_per_pass, share_nums, share_size):
             (len(share_nums) * share_size) / bytes_per_pass,
         ),
     )
+
+
+def has_writes(tw_vectors):
+    """
+    :param tw_vectors: See
+        ``allmydata.interfaces.TestAndWriteVectorsForShares``.
+
+    :return bool: ``True`` if any only if there are writes in ``tw_vectors``.
+    """
+    return any(
+        data
+        for (test, data, new_length)
+        in tw_vectors.values()
+    )
diff --git a/src/_zkapauthorizer/tests/strategies.py b/src/_zkapauthorizer/tests/strategies.py
index fbbb5c0e3085c1a171c8666154a1b46628edb48b..037d5a331a5ee6c74bd9f0cb41bdbbb54bb007ff 100644
--- a/src/_zkapauthorizer/tests/strategies.py
+++ b/src/_zkapauthorizer/tests/strategies.py
@@ -319,7 +319,9 @@ def sharenum_sets():
     return sets(
         sharenums(),
         min_size=1,
-        max_size=255,
+        # This could go as high as 255 but to avoid tripping over the limits
+        # discussed in sizes(), keep it smaller.
+        max_size=8,
     )
 
 
@@ -330,8 +332,10 @@ def sizes():
     return integers(
         # Size 0 data isn't data, it's nothing.
         min_value=1,
-        # Just for practical purposes...
-        max_value=2 ** 16,
+        # For the moment there are some assumptions in the test suite that
+        # limit us to an amount of storage that can be paid for with one ZKAP.
+        # That will be fixed eventually.  For now, keep the sizes pretty low.
+        max_value=2 ** 8,
     )
 
 
@@ -342,7 +346,7 @@ def offsets():
     return integers(
         min_value=0,
         # Just for practical purposes...
-        max_value=2 ** 16,
+        max_value=2 ** 8,
     )
 
 
diff --git a/src/_zkapauthorizer/tests/test_storage_protocol.py b/src/_zkapauthorizer/tests/test_storage_protocol.py
index 98320fdd1581c2a537bc505492c597ddceb0c3e8..d0fe8bb05ef9efb366a25ac88073f2999f83ba97 100644
--- a/src/_zkapauthorizer/tests/test_storage_protocol.py
+++ b/src/_zkapauthorizer/tests/test_storage_protocol.py
@@ -47,7 +47,6 @@ from testtools.twistedsupport._deferred import (
 from hypothesis import (
     given,
     assume,
-    note,
 )
 from hypothesis.strategies import (
     tuples,
@@ -140,18 +139,41 @@ class LocalRemote(object):
         )
 
 
+def assume_one_pass(test_and_write_vectors_for_shares):
+    """
+    Assume that the writes represented by the given ``TestAndWriteVectors``
+    will cost at most one pass.
+    """
+    from .._storage_server import (
+        BYTES_PER_PASS,
+        get_sharenums,
+        get_allocated_size,
+        required_passes,
+    )
+    tw_vectors = {k: v.for_call() for (k, v) in test_and_write_vectors_for_shares.items()}
+    sharenums = get_sharenums(tw_vectors)
+    allocated_size = get_allocated_size(tw_vectors)
+    assume(required_passes(BYTES_PER_PASS, sharenums, allocated_size) <= 1)
+
 
 class ShareTests(TestCase):
     """
     Tests for interaction with shares.
+
+    :ivar int spent_passes: The number of passes which have been spent so far
+        in the course of a single test (in the case of Hypothesis, every
+        iteration of the test so far, probably; so make relative comparisons
+        instead of absolute ones).
     """
     def setUp(self):
         super(ShareTests, self).setUp()
         self.canary = LocalReferenceable(None)
         self.anonymous_storage_server = self.useFixture(AnonymousStorageServer()).storage_server
         self.signing_key = random_signing_key()
+        self.spent_passes = 0
 
         def get_passes(message, count):
+            self.spent_passes += count
             return list(
                 Pass(pass_.decode("ascii"))
                 for pass_
@@ -194,6 +216,9 @@ class ShareTests(TestCase):
         resulting buckets can be read back using *get_buckets* and methods of
         those resulting buckets.
         """
+        # XXX
+        assume(len(sharenums) * size < 128 * 1024 * 10)
+
         # Hypothesis causes our storage server to be used many times.  Clean
         # up between iterations.
         cleanup_storage_server(self.anonymous_storage_server)
@@ -379,8 +404,11 @@ class ShareTests(TestCase):
     def test_create_mutable(self, storage_index, secrets, test_and_write_vectors_for_shares):
         """
         Mutable share data written using *slot_testv_and_readv_and_writev* can be
-        read back.
+        read back as-written and without spending any more passes.
         """
+        # XXX
+        assume_one_pass(test_and_write_vectors_for_shares)
+
         # Hypothesis causes our storage server to be used many times.  Clean
         # up between iterations.
         cleanup_storage_server(self.anonymous_storage_server)
@@ -397,53 +425,25 @@ class ShareTests(TestCase):
                 r_vector=[],
             ),
         )
-
         self.assertThat(
             wrote,
             Equals(True),
             u"Server rejected a write to a new mutable slot",
         )
-
         self.assertThat(
             read,
             Equals({}),
             u"Server gave back read results when we asked for none.",
         )
+        # Now we can read it back without spending any more passes.
+        before_spent_passes = self.spent_passes
+        assert_read_back_data(self, storage_index, secrets, test_and_write_vectors_for_shares)
+        after_spent_passes = self.spent_passes
+        self.assertThat(
+            before_spent_passes,
+            Equals(after_spent_passes),
+        )
 
-        for sharenum, vectors in test_and_write_vectors_for_shares.items():
-            r_vector = list(map(write_vector_to_read_vector, vectors.write_vector))
-            read = extract_result(
-                self.client.slot_readv(
-                    storage_index,
-                    shares=[sharenum],
-                    r_vector=r_vector,
-                ),
-            )
-            note("read vector {}".format(r_vector))
-            # Create a buffer and pile up all the write operations in it.
-            # This lets us make correct assertions about overlapping writes.
-            length = max(
-                offset + len(data)
-                for (offset, data)
-                in vectors.write_vector
-            )
-            expected = b"\x00" * length
-            for (offset, data) in vectors.write_vector:
-                expected = expected[:offset] + data + expected[offset + len(data):]
-            if vectors.new_length is not None and vectors.new_length < length:
-                expected = expected[:vectors.new_length]
-            self.assertThat(
-                read,
-                Equals({sharenum: list(
-                    # Get the expected value out of our scratch buffer.
-                    expected[offset:offset + len(data)]
-                    for (offset, data)
-                    in vectors.write_vector
-                )}),
-                u"Server didn't reliably read back data just written for share {}".format(
-                    sharenum,
-                ),
-            )
     @given(
         storage_index=storage_indexes(),
         secrets=tuples(
@@ -453,38 +453,63 @@ class ShareTests(TestCase):
         ),
         test_and_write_vectors_for_shares=test_and_write_vectors_for_shares(),
     )
-    def test_mutable_write_preserves_lease(self, storage_index, secrets, test_and_write_vectors_for_shares):
+    def test_mutable_rewrite_preserves_lease(self, storage_index, secrets, test_and_write_vectors_for_shares):
         """
-        When mutable share data is written using *slot_testv_and_readv_and_writev*
-        any leases on the corresponding slot remain the same.
+        When mutable share data is rewritten using
+        *slot_testv_and_readv_and_writev* any leases on the corresponding slot
+        remain the same.
         """
+        # XXX
+        assume_one_pass(test_and_write_vectors_for_shares)
+
         # Hypothesis causes our storage server to be used many times.  Clean
         # up between iterations.
         cleanup_storage_server(self.anonymous_storage_server)
 
-        wrote, read = extract_result(
-            self.client.slot_testv_and_readv_and_writev(
-                storage_index,
-                secrets=secrets,
-                tw_vectors={
-                    k: v.for_call()
-                    for (k, v)
-                    in test_and_write_vectors_for_shares.items()
-                },
-                r_vector=[],
-            ),
-        )
+        def leases():
+            return list(
+                lease.to_mutable_data()
+                for lease
+                in self.anonymous_storage_server.get_slot_leases(storage_index)
+            )
+
+        def write():
+            return extract_result(
+                self.client.slot_testv_and_readv_and_writev(
+                    storage_index,
+                    secrets=secrets,
+                    tw_vectors={
+                        k: v.for_call()
+                        for (k, v)
+                        in test_and_write_vectors_for_shares.items()
+                    },
+                    r_vector=[],
+                ),
+            )
 
+        # Perform an initial write so there is something to rewrite.
+        wrote, read = write()
         self.assertThat(
             wrote,
             Equals(True),
             u"Server rejected a write to a new mutable slot",
         )
 
-        # There are *no* leases on this newly written slot!
+        # Note the prior state.
+        leases_before = leases()
+
+        # Now perform the rewrite.
+        wrote, read = write()
         self.assertThat(
-            list(self.anonymous_storage_server.get_slot_leases(storage_index)),
-            Equals([]),
+            wrote,
+            Equals(True),
+            u"Server rejected rewrite of an existing mutable slot",
+        )
+
+        # Leases are exactly unchanged.
+        self.assertThat(
+            leases(),
+            Equals(leases_before),
         )
 
     @given(
@@ -568,6 +593,56 @@ class ShareTests(TestCase):
         )
 
 
+def assert_read_back_data(self, storage_index, secrets, test_and_write_vectors_for_shares):
+    """
+    Assert that the data written by ``test_and_write_vectors_for_shares`` can
+    be read back from ``storage_index``.
+
+    :param ShareTests self: The test case which performed the write and can be
+        used for assertions.
+
+    :param bytes storage_index: The storage index where the data should be
+        found.
+
+    :raise: A test-failing assertion if the data cannot be read back.
+    """
+    # Create a buffer and pile up all the write operations in it.
+    # This lets us make correct assertions about overlapping writes.
+    for sharenum, vectors in test_and_write_vectors_for_shares.items():
+        length = max(
+            offset + len(data)
+            for (offset, data)
+            in vectors.write_vector
+        )
+        expected = b"\x00" * length
+        for (offset, data) in vectors.write_vector:
+            expected = expected[:offset] + data + expected[offset + len(data):]
+        if vectors.new_length is not None and vectors.new_length < length:
+            expected = expected[:vectors.new_length]
+
+        expected_result = list(
+            # Get the expected value out of our scratch buffer.
+            expected[offset:offset + len(data)]
+            for (offset, data)
+            in vectors.write_vector
+        )
+
+        _, single_read = extract_result(
+            self.client.slot_testv_and_readv_and_writev(
+                storage_index,
+                secrets=secrets,
+                tw_vectors={},
+                r_vector=list(map(write_vector_to_read_vector, vectors.write_vector)),
+            ),
+        )
+
+        self.assertThat(
+            single_read[sharenum],
+            Equals(expected_result),
+            u"Server didn't reliably read back data just written",
+        )
+
+
 def write_vector_to_read_vector(write_vector):
     """
     Create a read vector which will read back the data written by the given
diff --git a/src/_zkapauthorizer/tests/test_storage_server.py b/src/_zkapauthorizer/tests/test_storage_server.py
index ad8ba0eb5fcaa615cfb39b7b7056c963295d7048..1ef1186984c36bf5b29bfb00c4fac34bf14a6508 100644
--- a/src/_zkapauthorizer/tests/test_storage_server.py
+++ b/src/_zkapauthorizer/tests/test_storage_server.py
@@ -38,6 +38,7 @@ from hypothesis import (
 from hypothesis.strategies import (
     integers,
     lists,
+    tuples,
 )
 from privacypass import (
     RandomToken,
@@ -52,6 +53,10 @@ from .privacypass import (
 )
 from .strategies import (
     zkaps,
+    storage_indexes,
+    write_enabler_secrets,
+    lease_renew_secrets,
+    lease_cancel_secrets,
 )
 from .fixtures import (
     AnonymousStorageServer,
@@ -137,3 +142,48 @@ class PassValidationTests(TestCase):
             allocate_buckets,
             raises(MorePassesRequired),
         )
+
+
+    @given(
+        storage_index=storage_indexes(),
+        secrets=tuples(
+            write_enabler_secrets(),
+            lease_renew_secrets(),
+            lease_cancel_secrets(),
+        ),
+    )
+    def test_create_mutable_fails_without_passes(self, storage_index, secrets):
+        """
+        If ``remote_slot_testv_and_readv_and_writev`` is invoked to perform
+        initial writes on shares without supplying passes, the operation fails
+        with ``LeaseRenewalRequired``.
+        """
+        data = b"01234567"
+        offset = 0
+        sharenum = 0
+        mutable_write = lambda: self.storage_server.doRemoteCall(
+            "slot_testv_and_readv_and_writev",
+            (),
+            dict(
+                passes=[],
+                storage_index=storage_index,
+                secrets=secrets,
+                tw_vectors={
+                    sharenum: ([], [(offset, data)], None),
+                },
+                r_vector=[],
+            ),
+        )
+
+        try:
+            result = mutable_write()
+        except MorePassesRequired as e:
+            self.assertThat(
+                e.required_count,
+                Equals(1),
+            )
+        else:
+            self.fail("expected LeaseRenewalRequired, got {}".format(result))
+
+    # TODO
+    # a write that increases the storage cost of the share requires passes too