diff --git a/setup.cfg b/setup.cfg
index ebbd1ac295db4e0950f28815949e5900618e59b1..343668b5ab5e883c41f40fc31fd40c1f152ce0c3 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -48,6 +48,7 @@ install_requires =
     tahoe-lafs @ https://github.com/tahoe-lafs/tahoe-lafs/archive/d3c6f58a8ded7db3324ef97c47f5c1921c3d58b7.zip
     treq
     pyutil
+    prometheus-client
 
 [options.extras_require]
 test = coverage; fixtures; testtools; hypothesis
diff --git a/src/_zkapauthorizer/_plugin.py b/src/_zkapauthorizer/_plugin.py
index 8ae7fbeb74d5c01a70a2d54bc6f36d78b1b98958..503d43a34a9cbafafc5e4a84f0d4405f39b734c6 100644
--- a/src/_zkapauthorizer/_plugin.py
+++ b/src/_zkapauthorizer/_plugin.py
@@ -17,16 +17,26 @@ The Twisted plugin that glues the Zero-Knowledge Access Pass system into
 Tahoe-LAFS.
 """
 
+from __future__ import absolute_import
+
 import random
 from datetime import datetime
 from functools import partial
 from weakref import WeakValueDictionary
 
+try:
+    from typing import Callable
+except ImportError:
+    pass
+
 import attr
 from allmydata.client import _Client
 from allmydata.interfaces import IAnnounceableStorageServer, IFoolscapStoragePlugin
 from allmydata.node import MissingConfigEntry
 from challenge_bypass_ristretto import SigningKey
+from eliot import start_action
+from prometheus_client import CollectorRegistry, write_to_textfile
+from twisted.internet import task
 from twisted.internet.defer import succeed
 from twisted.logger import Logger
 from twisted.python.filepath import FilePath
@@ -95,8 +105,23 @@ class ZKAPAuthorizer(object):
         """
         return get_redeemer(self.name, node_config, announcement, reactor)
 
-    def get_storage_server(self, configuration, get_anonymous_storage_server):
+    def get_storage_server(
+        self, configuration, get_anonymous_storage_server, reactor=None
+    ):
+        if reactor is None:
+            from twisted.internet import reactor
+        registry = CollectorRegistry()
         kwargs = configuration.copy()
+
+        # If metrics are desired, schedule their writing to disk.
+        metrics_interval = kwargs.pop(u"prometheus-metrics-interval", None)
+        metrics_path = kwargs.pop(u"prometheus-metrics-path", None)
+        if metrics_interval is not None and metrics_path is not None:
+            FilePath(metrics_path).parent().makedirs(ignoreExistingDirectory=True)
+            t = task.LoopingCall(make_safe_writer(metrics_path, registry))
+            t.clock = reactor
+            t.start(int(metrics_interval))
+
         root_url = kwargs.pop(u"ristretto-issuer-root-url")
         pass_value = int(kwargs.pop(u"pass-value", BYTES_PER_PASS))
         signing_key = load_signing_key(
@@ -111,6 +136,7 @@ class ZKAPAuthorizer(object):
             get_anonymous_storage_server(),
             pass_value=pass_value,
             signing_key=signing_key,
+            registry=registry,
             **kwargs
         )
         return succeed(
@@ -158,6 +184,27 @@ class ZKAPAuthorizer(object):
         )
 
 
+def make_safe_writer(metrics_path, registry):
+    # type: (str, CollectorRegistry) -> Callable[[], None]
+    """
+    Make a no-argument callable that writes metrics from the given registry to
+    the given path.  The callable will log errors writing to the path and not
+    raise exceptions.
+    """
+
+    def safe_writer():
+        try:
+            with start_action(
+                action_type=u"zkapauthorizer:metrics:write-to-textfile",
+                metrics_path=metrics_path,
+            ):
+                write_to_textfile(metrics_path, registry)
+        except Exception:
+            pass
+
+    return safe_writer
+
+
 _init_storage = _Client.__dict__["init_storage"]
 
 
diff --git a/src/_zkapauthorizer/_storage_server.py b/src/_zkapauthorizer/_storage_server.py
index da6c959cb4bc3bd22b7c48e1fb5fdbb3cf7a71cb..1b541131867dab2371222a366dc38fe44d6d9c92 100644
--- a/src/_zkapauthorizer/_storage_server.py
+++ b/src/_zkapauthorizer/_storage_server.py
@@ -41,6 +41,7 @@ from attr.validators import instance_of, provides
 from challenge_bypass_ristretto import SigningKey, TokenPreimage, VerificationSignature
 from eliot import log_call, start_action
 from foolscap.api import Referenceable
+from prometheus_client import CollectorRegistry, Histogram
 from twisted.internet.defer import Deferred
 from twisted.internet.interfaces import IReactorTime
 from twisted.python.filepath import FilePath
@@ -64,6 +65,19 @@ try:
 except ImportError:
     pass
 
+# The last Python 2-supporting prometheus_client nevertheless tries to use
+# FileNotFoundError, an exception type from Python 3.  Since that release,
+# prometheus_client has dropped Python 2 support entirely so there is little
+# hope of ever having this fixed upstream.  When ZKAPAuthorizer is ported to
+# Python 3, this should no longer be necessary.
+def _prometheus_client_fix():
+    import prometheus_client.exposition
+
+    prometheus_client.exposition.FileNotFoundError = IOError
+
+
+_prometheus_client_fix()
+
 # See allmydata/storage/mutable.py
 SLOT_HEADER_SIZE = 468
 LEASE_TRAILER_SIZE = 4
@@ -177,10 +191,52 @@ class ZKAPAuthorizerStorageServer(Referenceable):
     _original = attr.ib(validator=provides(RIStorageServer))
     _pass_value = pass_value_attribute()
     _signing_key = attr.ib(validator=instance_of(SigningKey))
+    _registry = attr.ib(
+        default=attr.Factory(CollectorRegistry),
+        validator=attr.validators.instance_of(CollectorRegistry),
+    )
     _clock = attr.ib(
         validator=provides(IReactorTime),
         default=attr.Factory(partial(namedAny, "twisted.internet.reactor")),
     )
+    _metric_spending_successes = attr.ib(init=False)
+
+    def _get_spending_histogram_buckets(self):
+        """
+        Create the upper bounds for the ZKAP spending histogram.
+        """
+        # We want a lot of small buckets to be able to get an idea of how much
+        # spending is for tiny files where our billing system doesn't work
+        # extremely well.  We also want some large buckets so we have a point
+        # of comparison - is there a lot more or less spending on big files
+        # than small files?  Prometheus recommends a metric have a maximum
+        # cardinality below 10
+        # (<https://prometheus.io/docs/practices/instrumentation/#do-not-overuse-labels>).
+        # Histograms are implemented with labels so the cardinality is equal
+        # to the number of buckets.  We will push this a little bit so we can
+        # span a better range.  The good news is that this is a static
+        # cardinality (it does not change based on the data observed) so we
+        # are not at risk of blowing up the metrics overhead unboundedly.  11
+        # finite buckets + 1 infinite bucket covers 1 to 1024 ZKAPs (plus
+        # infinity) and only needs 12 buckets.
+        return list(2 ** n for n in range(11)) + [float("inf")]
+
+    @_metric_spending_successes.default
+    def _make_histogram(self):
+        return Histogram(
+            "zkapauthorizer_server_spending_successes",
+            "ZKAP Spending Successes histogram",
+            registry=self._registry,
+            buckets=self._get_spending_histogram_buckets(),
+        )
+
+    def _clear_metrics(self):
+        """
+        Forget all recorded metrics.
+        """
+        # There is also a `clear` method it's for something else.  See
+        # https://github.com/prometheus/client_python/issues/707
+        self._metric_spending_successes._metric_init()
 
     def remote_get_version(self):
         """
@@ -244,6 +300,22 @@ class ZKAPAuthorizerStorageServer(Referenceable):
             allocated_size,
             renew_leases=False,
         )
+
+        # We just committed to spending some of the presented passes.  If
+        # `alreadygot` is not empty then we didn't commit to spending *all* of
+        # them.  (Also, we didn't *accept* data for storage yet - but that's a
+        # defect in the spending protocol and metrics can't fix it so just
+        # ignore that for now.)
+        #
+        # This expression mirrors the expression the client uses to determine
+        # how many passes were spent when it processes the result we return to
+        # it.
+        spent_passes = required_passes(
+            self._pass_value,
+            [allocated_size] * len(bucketwriters),
+        )
+        self._metric_spending_successes.observe(spent_passes)
+
         # Copy/paste the disconnection handling logic from
         # StorageServer.remote_allocate_buckets.
         for bw in bucketwriters.values():
@@ -252,6 +324,7 @@ class ZKAPAuthorizerStorageServer(Referenceable):
                 canary,
                 disconnect_marker,
             )
+
         return alreadygot, bucketwriters
 
     def remote_get_buckets(self, storage_index):
@@ -277,7 +350,9 @@ class ZKAPAuthorizerStorageServer(Referenceable):
             validation,
             self._original,
         )
-        return self._original.remote_add_lease(storage_index, *a, **kw)
+        result = self._original.remote_add_lease(storage_index, *a, **kw)
+        self._metric_spending_successes.observe(len(validation.valid))
+        return result
 
     def remote_advise_corrupt_share(self, *a, **kw):
         """
@@ -406,6 +481,9 @@ class ZKAPAuthorizerStorageServer(Referenceable):
         # somewhat.
         add_leases_for_writev(self._original, storage_index, secrets, tw_vectors, now)
 
+        # The operation has fully succeeded.
+        self._metric_spending_successes.observe(required_new_passes)
+
         # Propagate the result of the operation.
         return result
 
@@ -442,6 +520,7 @@ def check_pass_quantity(pass_value, validation, share_sizes):
 def check_pass_quantity_for_lease(
     pass_value, storage_index, validation, storage_server
 ):
+    # type: (int, bytes, _ValidationResult, ZKAPAuthorizerStorageServer) -> Dict[int, int]
     """
     Check that the given number of passes is sufficient to add or renew a
     lease for one period for the given storage index.
@@ -453,7 +532,8 @@ def check_pass_quantity_for_lease(
     :raise MorePassesRequired: If the given number of passes is too few for
         the share sizes at the given storage index.
 
-    :return: ``None`` if the given number of passes is sufficient.
+    :return: A mapping from share number to share size on the server if the
+        number of passes given is sufficient.
     """
     allocated_sizes = dict(
         get_share_sizes(
@@ -461,8 +541,9 @@ def check_pass_quantity_for_lease(
             storage_index,
             list(get_all_share_numbers(storage_server, storage_index)),
         ),
-    ).values()
-    check_pass_quantity(pass_value, validation, allocated_sizes)
+    )
+    check_pass_quantity(pass_value, validation, allocated_sizes.values())
+    return allocated_sizes
 
 
 def check_pass_quantity_for_write(pass_value, validation, sharenums, allocated_size):
diff --git a/src/_zkapauthorizer/storage_common.py b/src/_zkapauthorizer/storage_common.py
index 9ecefe6e5e9c514c1c863fec093c449aa1cab4b8..bbe326a292b502024eef0b77b4181977c4f2509d 100644
--- a/src/_zkapauthorizer/storage_common.py
+++ b/src/_zkapauthorizer/storage_common.py
@@ -155,7 +155,7 @@ def required_passes(bytes_per_pass, share_sizes):
             ),
         )
     result, b = divmod(sum(share_sizes, 0), bytes_per_pass)
-    if b:
+    if b > 0:
         result += 1
 
     # print("required_passes({}, {}) == {}".format(bytes_per_pass, share_sizes, result))
diff --git a/src/_zkapauthorizer/tests/test_plugin.py b/src/_zkapauthorizer/tests/test_plugin.py
index 0e87c89cae5e7cb2f714f582e6f9dcc1f37d088d..f300d3a6ece15d8ba48479bb437c300a2057fc0d 100644
--- a/src/_zkapauthorizer/tests/test_plugin.py
+++ b/src/_zkapauthorizer/tests/test_plugin.py
@@ -18,6 +18,7 @@ Tests for the Tahoe-LAFS plugin.
 
 from __future__ import absolute_import
 
+from datetime import timedelta
 from functools import partial
 from os import makedirs
 
@@ -35,7 +36,9 @@ from foolscap.broker import Broker
 from foolscap.ipb import IReferenceable, IRemotelyCallable
 from foolscap.referenceable import LocalReferenceable
 from hypothesis import given, settings
-from hypothesis.strategies import datetimes, just, sampled_from
+from hypothesis.strategies import datetimes, just, sampled_from, timedeltas
+from prometheus_client import Gauge
+from prometheus_client.parser import text_string_to_metric_families
 from StringIO import StringIO
 from testtools import TestCase
 from testtools.content import text_content
@@ -43,6 +46,7 @@ from testtools.matchers import (
     AfterPreprocessing,
     AllMatch,
     Always,
+    AnyMatch,
     Contains,
     ContainsDict,
     Equals,
@@ -50,12 +54,15 @@ from testtools.matchers import (
     IsInstance,
     Matcher,
     MatchesAll,
+    MatchesListwise,
     MatchesStructure,
 )
 from testtools.twistedsupport import succeeded
+from testtools.twistedsupport._deferred import extract_result
 from twisted.internet.task import Clock
 from twisted.plugin import getPlugins
 from twisted.python.filepath import FilePath
+from twisted.python.runtime import platform
 from twisted.test.proto_helpers import StringTransport
 from twisted.web.resource import IResource
 
@@ -68,6 +75,7 @@ from ..foolscap import RIPrivacyPassAuthorizedStorageServer
 from ..lease_maintenance import SERVICE_NAME, LeaseMaintenanceConfig
 from ..model import NotEnoughTokens, VoucherStore
 from ..spending import GET_PASSES
+from .common import skipIf
 from .eliot import capture_logging
 from .foolscap import DummyReferenceable, LocalRemote, get_anonymous_storage_server
 from .matchers import Provides, raises
@@ -75,6 +83,7 @@ from .strategies import (
     announcements,
     client_dummyredeemer_configurations,
     client_lease_maintenance_configurations,
+    clocks,
     dummy_ristretto_keys,
     lease_cancel_secrets,
     lease_maintenance_configurations,
@@ -174,6 +183,7 @@ class PluginTests(TestCase):
         )
 
 
+@skipIf(platform.isWindows(), "Storage server is not supported on Windows")
 class ServerPluginTests(TestCase):
     """
     Tests for the plugin's implementation of
@@ -264,6 +274,68 @@ class ServerPluginTests(TestCase):
             ),
         )
 
+    @given(timedeltas(min_value=timedelta(seconds=1)), clocks())
+    def test_metrics_written(self, metrics_interval, clock):
+        """
+        When the configuration tells us where to put a metrics .prom file
+        and an interval how often to do so, test that metrics are actually
+        written there after the configured interval.
+        """
+        metrics_path = self.useFixture(TempDir()).join(u"metrics")
+        configuration = {
+            u"prometheus-metrics-path": metrics_path,
+            u"prometheus-metrics-interval": str(int(metrics_interval.total_seconds())),
+            u"ristretto-issuer-root-url": "foo",
+            u"ristretto-signing-key-path": SIGNING_KEY_PATH.path,
+        }
+        announceable = extract_result(
+            storage_server.get_storage_server(
+                configuration,
+                get_anonymous_storage_server,
+                reactor=clock,
+            )
+        )
+        registry = announceable.storage_server._registry
+
+        g = Gauge("foo", "bar", registry=registry)
+        for i in range(2):
+            g.set(i)
+
+            clock.advance(metrics_interval.total_seconds())
+            self.assertThat(
+                metrics_path,
+                has_metric(Equals("foo"), Equals(i)),
+            )
+
+
+def has_metric(name_matcher, value_matcher):
+    """
+    Create a matcher that matches a path that contains serialized metrics that
+    include at least a single metric that is matched by the given
+    ``name_matcher`` and ``value_matcher``.
+    """
+
+    def read_metrics(path):
+        with open(path) as f:
+            return list(text_string_to_metric_families(f.read()))
+
+    return AfterPreprocessing(
+        read_metrics,
+        AnyMatch(
+            MatchesStructure(
+                name=name_matcher,
+                samples=MatchesListwise(
+                    [
+                        MatchesStructure(
+                            name=name_matcher,
+                            value=value_matcher,
+                        ),
+                    ]
+                ),
+            ),
+        ),
+    )
+
 
 tahoe_configs_with_dummy_redeemer = tahoe_configs(client_dummyredeemer_configurations())
 
diff --git a/src/_zkapauthorizer/tests/test_storage_server.py b/src/_zkapauthorizer/tests/test_storage_server.py
index cd0eb20d52e70db098875fa6a9ea48c4b26deb5c..ad570cb0205f092566c7773a2f04c60e74a215e6 100644
--- a/src/_zkapauthorizer/tests/test_storage_server.py
+++ b/src/_zkapauthorizer/tests/test_storage_server.py
@@ -21,6 +21,7 @@ from __future__ import absolute_import, division
 from random import shuffle
 from time import time
 
+from allmydata.storage.mutable import MutableShareFile
 from challenge_bypass_ristretto import RandomToken, random_signing_key
 from foolscap.referenceable import LocalReferenceable
 from hypothesis import given, note
@@ -139,6 +140,42 @@ class ValidationResultTests(TestCase):
             )
 
 
+def read_spending_success_histogram_total(storage_server):
+    # type: (ZKAPAuthorizerStorageServer) -> int
+    """
+    Read the total number of values across all buckets of the spending success
+    metric histogram.
+    """
+    # Reading _buckets seems like the least bad option for now.  See
+    # https://github.com/prometheus/client_python/issues/736 though.
+    buckets = storage_server._metric_spending_successes._buckets
+    return sum(b.get() for b in buckets)
+
+
+def read_spending_success_histogram_bucket(storage_server, num_passes):
+    # type: (ZKAPAuthorizerStorageServer, int) -> int
+    """
+    Read the value of a single bucket of the spending success metric
+    histogram.
+
+    :param num_passes: A pass spending count which determines which bucket to
+        read.  Whichever bucket holds values for the quantized pass count is
+        the bucket to be read.
+    """
+    bounds = storage_server._get_spending_histogram_buckets()
+    for bucket_number, upper_bound in enumerate(bounds):
+        if num_passes <= upper_bound:
+            break
+
+    note("bucket_number {}".format(bucket_number))
+    # See note above about reading private _buckets attribute.
+    buckets = storage_server._metric_spending_successes._buckets
+    note(
+        "bucket counters: {}".format(list((n, b.get()) for n, b in enumerate(buckets)))
+    )
+    return buckets[bucket_number].get()
+
+
 class PassValidationTests(TestCase):
     """
     Tests for pass validation performed by ``ZKAPAuthorizerStorageServer``.
@@ -162,7 +199,7 @@ class PassValidationTests(TestCase):
             self.anonymous_storage_server,
             self.pass_value,
             self.signing_key,
-            self.clock,
+            clock=self.clock,
         )
 
     def setup_example(self):
@@ -180,6 +217,10 @@ class PassValidationTests(TestCase):
         # method.
         cleanup_storage_server(self.anonymous_storage_server)
 
+        # Reset all of the metrics, too, so the individual tests have a
+        # simpler job (can compare values relative to 0).
+        self.storage_server._clear_metrics()
+
     def test_allocate_buckets_fails_without_enough_passes(self):
         """
         ``remote_allocate_buckets`` fails with ``MorePassesRequired`` if it is
@@ -454,6 +495,12 @@ class PassValidationTests(TestCase):
                     ),
                 ),
             )
+            # Since it was not successful, the successful spending metric
+            # hasn't changed.
+            self.assertThat(
+                read_spending_success_histogram_total(self.storage_server),
+                Equals(0),
+            )
         else:
             self.fail("Expected MorePassesRequired, got {}".format(result))
 
@@ -524,3 +571,332 @@ class PassValidationTests(TestCase):
             actual_sizes,
             Equals(expected_sizes),
         )
+
+    @given(
+        storage_index=storage_indexes(),
+        secrets=tuples(
+            write_enabler_secrets(),
+            lease_renew_secrets(),
+            lease_cancel_secrets(),
+        ),
+        test_and_write_vectors_for_shares=slot_test_and_write_vectors_for_shares(),
+    )
+    def test_mutable_spending_metrics(
+        self,
+        storage_index,
+        secrets,
+        test_and_write_vectors_for_shares,
+    ):
+        tw_vectors = {
+            k: v.for_call() for (k, v) in test_and_write_vectors_for_shares.items()
+        }
+        num_passes = get_required_new_passes_for_mutable_write(
+            self.pass_value,
+            dict.fromkeys(tw_vectors.keys(), 0),
+            tw_vectors,
+        )
+        valid_passes = make_passes(
+            self.signing_key,
+            slot_testv_and_readv_and_writev_message(storage_index),
+            list(RandomToken.create() for i in range(num_passes)),
+        )
+
+        test, read = self.storage_server.doRemoteCall(
+            "slot_testv_and_readv_and_writev",
+            (),
+            dict(
+                passes=valid_passes,
+                storage_index=storage_index,
+                secrets=secrets,
+                tw_vectors=tw_vectors,
+                r_vector=[],
+            ),
+        )
+
+        after_count = read_spending_success_histogram_total(self.storage_server)
+        after_bucket = read_spending_success_histogram_bucket(
+            self.storage_server, num_passes
+        )
+
+        self.expectThat(
+            after_count,
+            Equals(1),
+            "Unexpected histogram sum value",
+        )
+        self.assertThat(
+            after_bucket,
+            Equals(1),
+            "Unexpected histogram bucket value",
+        )
+
+    @given(
+        storage_index=storage_indexes(),
+        secrets=tuples(
+            write_enabler_secrets(),
+            lease_renew_secrets(),
+            lease_cancel_secrets(),
+        ),
+        test_and_write_vectors_for_shares=slot_test_and_write_vectors_for_shares(),
+    )
+    def test_mutable_failure_spending_metrics(
+        self,
+        storage_index,
+        secrets,
+        test_and_write_vectors_for_shares,
+    ):
+        """
+        If a mutable storage operation fails then the successful pass spending
+        metric is not incremented.
+        """
+        tw_vectors = {
+            k: v.for_call() for (k, v) in test_and_write_vectors_for_shares.items()
+        }
+        num_passes = get_required_new_passes_for_mutable_write(
+            self.pass_value,
+            dict.fromkeys(tw_vectors.keys(), 0),
+            tw_vectors,
+        )
+        valid_passes = make_passes(
+            self.signing_key,
+            slot_testv_and_readv_and_writev_message(storage_index),
+            list(RandomToken.create() for i in range(num_passes)),
+        )
+
+        # The very last step of a mutable write is the lease renewal step.
+        # We'll break that part to be sure metrics are only recorded after
+        # that (ie, after the operation has completely succeeded).  It's not
+        # easy to break that operation so we reach into some private guts to
+        # do so...  After we upgrade to Tahoe 1.17.0 then we can mess around
+        # with `reserved_space` to make Tahoe think there's no room for the
+        # leases and fail the operation, perhaps (but how to do that without
+        # making the earlier storage-allocating part of the operation fail?).
+        self.patch(MutableShareFile, "add_or_renew_lease", lambda *a, **kw: 1 / 0)
+
+        try:
+            test, read = self.storage_server.doRemoteCall(
+                "slot_testv_and_readv_and_writev",
+                (),
+                dict(
+                    passes=valid_passes,
+                    storage_index=storage_index,
+                    secrets=secrets,
+                    tw_vectors=tw_vectors,
+                    r_vector=[],
+                ),
+            )
+        except ZeroDivisionError:
+            pass
+        else:
+            self.fail("expected our ZeroDivisionError to be raised")
+
+        after_count = read_spending_success_histogram_total(self.storage_server)
+        self.expectThat(
+            after_count,
+            Equals(0),
+            "Expected no successful spending to be recorded in error case",
+        )
+
+    @given(
+        storage_index=storage_indexes(),
+        renew_secret=lease_renew_secrets(),
+        cancel_secret=lease_cancel_secrets(),
+        existing_sharenums=sharenum_sets(),
+        new_sharenums=sharenum_sets(),
+        size=sizes(),
+    )
+    def test_immutable_spending_metrics(
+        self,
+        storage_index,
+        renew_secret,
+        cancel_secret,
+        existing_sharenums,
+        new_sharenums,
+        size,
+    ):
+        """
+        When ZKAPs are spent to call *allocate_buckets* the number of passes spent
+        is recorded as a metric.
+        """
+        # maybe create some existing shares that won't need to be paid for by
+        # the subsequent `allocate_buckets` operation - but of which the
+        # client is unaware.
+        write_toy_shares(
+            self.anonymous_storage_server,
+            storage_index,
+            renew_secret,
+            cancel_secret,
+            existing_sharenums,
+            size,
+            LocalReferenceable(None),
+        )
+
+        # The client will present this many passes.
+        num_passes = required_passes(self.pass_value, [size] * len(new_sharenums))
+        # But only this many need to be spent.
+        num_spent_passes = required_passes(
+            self.pass_value,
+            [size] * len(new_sharenums - existing_sharenums),
+        )
+        valid_passes = make_passes(
+            self.signing_key,
+            allocate_buckets_message(storage_index),
+            list(RandomToken.create() for i in range(num_passes)),
+        )
+
+        alreadygot, allocated = self.storage_server.doRemoteCall(
+            "allocate_buckets",
+            (),
+            dict(
+                passes=valid_passes,
+                storage_index=storage_index,
+                renew_secret=renew_secret,
+                cancel_secret=cancel_secret,
+                sharenums=new_sharenums,
+                allocated_size=size,
+                canary=LocalReferenceable(None),
+            ),
+        )
+
+        after_count = read_spending_success_histogram_total(self.storage_server)
+        after_bucket = read_spending_success_histogram_bucket(
+            self.storage_server, num_spent_passes
+        )
+
+        self.expectThat(
+            after_count,
+            Equals(1),
+            "Unexpected histogram sum value",
+        )
+        # If this bucket is 1 then all the other buckets must be 0, otherwise
+        # the sum above will be greater than 1.
+        self.assertThat(
+            after_bucket,
+            Equals(1),
+            "Unexpected histogram bucket value",
+        )
+
+    @given(
+        storage_index=storage_indexes(),
+        renew_secret=lease_renew_secrets(),
+        cancel_secret=lease_cancel_secrets(),
+        sharenums=sharenum_sets(),
+        allocated_size=sizes(),
+    )
+    def test_add_lease_metrics(
+        self,
+        storage_index,
+        renew_secret,
+        cancel_secret,
+        sharenums,
+        allocated_size,
+    ):
+        # Create some shares at a slot which will require lease renewal.
+        write_toy_shares(
+            self.anonymous_storage_server,
+            storage_index,
+            renew_secret,
+            cancel_secret,
+            sharenums,
+            allocated_size,
+            LocalReferenceable(None),
+        )
+
+        num_passes = required_passes(
+            self.storage_server._pass_value, [allocated_size] * len(sharenums)
+        )
+        valid_passes = make_passes(
+            self.signing_key,
+            add_lease_message(storage_index),
+            list(RandomToken.create() for i in range(num_passes)),
+        )
+
+        self.storage_server.doRemoteCall(
+            "add_lease",
+            (),
+            dict(
+                passes=valid_passes,
+                storage_index=storage_index,
+                renew_secret=renew_secret,
+                cancel_secret=cancel_secret,
+            ),
+        )
+
+        after_count = read_spending_success_histogram_total(self.storage_server)
+        after_bucket = read_spending_success_histogram_bucket(
+            self.storage_server, num_passes
+        )
+
+        self.expectThat(
+            after_count,
+            Equals(1),
+            "Unexpected histogram sum value",
+        )
+        self.assertThat(
+            after_bucket,
+            Equals(1),
+            "Unexpected histogram bucket value",
+        )
+
+    @given(
+        storage_index=storage_indexes(),
+        renew_secret=lease_renew_secrets(),
+        cancel_secret=lease_cancel_secrets(),
+        sharenums=sharenum_sets(),
+        allocated_size=sizes(),
+    )
+    def test_add_lease_metrics_on_failure(
+        self, storage_index, renew_secret, cancel_secret, sharenums, allocated_size
+    ):
+        """
+        If the ``add_lease`` operation fails then the successful pass spending
+        metric is not incremented.
+        """
+        # Put some shares up there to target with the add_lease operation.
+        write_toy_shares(
+            self.anonymous_storage_server,
+            storage_index,
+            renew_secret,
+            cancel_secret,
+            sharenums,
+            allocated_size,
+            LocalReferenceable(None),
+        )
+
+        num_passes = required_passes(
+            self.storage_server._pass_value, [allocated_size] * len(sharenums)
+        )
+        valid_passes = make_passes(
+            self.signing_key,
+            add_lease_message(storage_index),
+            list(RandomToken.create() for i in range(num_passes)),
+        )
+
+        # Tahoe doesn't make it very easy to make an add_lease operation fail
+        # so monkey-patch something broken in.  After 1.17.0 we can set
+        # `reserved_space` on StorageServer to a very large number and the
+        # server should refuse to allocate space for a *new* lease (which
+        # means we need to use a different renew secret for the next step.
+        self.anonymous_storage_server.remote_add_lease = lambda *a, **kw: 1 / 0
+
+        try:
+            self.storage_server.doRemoteCall(
+                "add_lease",
+                (),
+                dict(
+                    passes=valid_passes,
+                    storage_index=storage_index,
+                    renew_secret=renew_secret,
+                    cancel_secret=cancel_secret,
+                ),
+            )
+        except ZeroDivisionError:
+            pass
+        else:
+            self.fail("expected our ZeroDivisionError to be raised")
+
+        after_count = read_spending_success_histogram_total(self.storage_server)
+        self.expectThat(
+            after_count,
+            Equals(0),
+            "Expected no successful spending to be recorded in error case",
+        )