diff --git a/src/_zkapauthorizer/_storage_server.py b/src/_zkapauthorizer/_storage_server.py
index 2efd0ffcdf24393503a4448b6c6e2b37f6569841..1b541131867dab2371222a366dc38fe44d6d9c92 100644
--- a/src/_zkapauthorizer/_storage_server.py
+++ b/src/_zkapauthorizer/_storage_server.py
@@ -468,8 +468,6 @@ class ZKAPAuthorizerStorageServer(Referenceable):
             renew_leases=False,
         )
 
-        self._metric_spending_successes.observe(required_new_passes)
-
         # Add the leases that we charged the client for.  This includes:
         #
         #  - leases on newly created shares
@@ -483,6 +481,9 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         # somewhat.
         add_leases_for_writev(self._original, storage_index, secrets, tw_vectors, now)
 
+        # The operation has fully succeeded.
+        self._metric_spending_successes.observe(required_new_passes)
+
         # Propagate the result of the operation.
         return result
 
diff --git a/src/_zkapauthorizer/tests/test_storage_server.py b/src/_zkapauthorizer/tests/test_storage_server.py
index f303459f0cd7b8b36e7039bbcab68ea7f6c311f0..137075b29210d5bbfa619559e997b56f8b9d8120 100644
--- a/src/_zkapauthorizer/tests/test_storage_server.py
+++ b/src/_zkapauthorizer/tests/test_storage_server.py
@@ -21,6 +21,7 @@ from __future__ import absolute_import, division
 from random import shuffle
 from time import time
 
+from allmydata.storage.mutable import MutableShareFile
 from challenge_bypass_ristretto import RandomToken, random_signing_key
 from foolscap.referenceable import LocalReferenceable
 from hypothesis import given, note
@@ -583,7 +584,6 @@ class PassValidationTests(TestCase):
             list(RandomToken.create() for i in range(num_passes)),
         )
 
-        # Create an initial share to toy with.
         test, read = self.storage_server.doRemoteCall(
             "slot_testv_and_readv_and_writev",
             (),
@@ -610,6 +610,73 @@ class PassValidationTests(TestCase):
             "Unexpected histogram bucket value",
         )
 
+    @given(
+        storage_index=storage_indexes(),
+        secrets=tuples(
+            write_enabler_secrets(),
+            lease_renew_secrets(),
+            lease_cancel_secrets(),
+        ),
+        test_and_write_vectors_for_shares=slot_test_and_write_vectors_for_shares(),
+    )
+    def test_mutable_failure_spending_metrics(
+        self,
+        storage_index,
+        secrets,
+        test_and_write_vectors_for_shares,
+    ):
+        """
+        If a mutable storage operation fails then the successful pass spending
+        metric is not incremented.
+        """
+        tw_vectors = {
+            k: v.for_call() for (k, v) in test_and_write_vectors_for_shares.items()
+        }
+        num_passes = get_required_new_passes_for_mutable_write(
+            self.pass_value,
+            dict.fromkeys(tw_vectors.keys(), 0),
+            tw_vectors,
+        )
+        valid_passes = make_passes(
+            self.signing_key,
+            slot_testv_and_readv_and_writev_message(storage_index),
+            list(RandomToken.create() for i in range(num_passes)),
+        )
+
+        # The very last step of a mutable write is the lease renewal step.
+        # We'll break that part to be sure metrics are only recorded after
+        # that (ie, after the operation has completely succeeded).  It's not
+        # easy to break that operation so we reach into some private guts to
+        # do so...  After we upgrade to Tahoe 1.17.0 then we can mess around
+        # with `reserved_space` to make Tahoe think there's no room for the
+        # leases and fail the operation, perhaps (but how to do that without
+        # making the earlier storage-allocating part of the operation fail?).
+        self.patch(MutableShareFile, "add_or_renew_lease", lambda *a, **kw: 1 / 0)
+
+        try:
+            test, read = self.storage_server.doRemoteCall(
+                "slot_testv_and_readv_and_writev",
+                (),
+                dict(
+                    passes=valid_passes,
+                    storage_index=storage_index,
+                    secrets=secrets,
+                    tw_vectors=tw_vectors,
+                    r_vector=[],
+                ),
+            )
+        except ZeroDivisionError:
+            pass
+        else:
+            self.fail("expected our ZeroDivisionError to be raised")
+
+        after_count = read_count(self.storage_server)
+        self.expectThat(
+            after_count,
+            Equals(0),
+            "Expected no successful spending to be recorded in error case",
+        )
+
     @given(
         storage_index=storage_indexes(),
         renew_secret=lease_renew_secrets(),
@@ -754,7 +821,9 @@ class PassValidationTests(TestCase):
         sharenums=sharenum_sets(),
         allocated_size=sizes(),
     )
-    def test_add_lease_metrics_on_failure(self, storage_index, renew_secret, cancel_secret, sharenums, allocated_size):
+    def test_add_lease_metrics_on_failure(
+        self, storage_index, renew_secret, cancel_secret, sharenums, allocated_size
+    ):
         """
         If the ``add_lease`` operation fails then the successful pass spending
         metric is not incremented.