Newer
Older
# Copyright 2019 PrivateStorage.io, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Tests for the Tahoe-LAFS plugin.
"""
from zope.interface import (
implementer,
)
from fixtures import (
TempDir,
)
from testtools import (
TestCase,
)
from testtools.matchers import (
AfterPreprocessing,
)
from testtools.twistedsupport import (
succeeded,
)
from hypothesis import (
given,
)
from foolscap.broker import (
Broker,
)
from foolscap.ipb import (
IReferenceable,
IRemotelyCallable,
)
from allmydata.interfaces import (
IFoolscapStoragePlugin,
)
from twisted.plugin import (
getPlugins,
)
from twisted.test.proto_helpers import (
StringTransport,
)
from twisted.web.resource import (
IResource,
)
)
from .matchers import (
Provides,
)
@implementer(RIStorageServer)
class StubStorageServer(object):
pass
def get_rref():
return None
Tests for ``twisted.plugins.zkapauthorizer.storage_server``.
"""
def test_discoverable(self):
"""
The plugin can be discovered.
"""
self.assertThat(
getPlugins(IFoolscapStoragePlugin),
Contains(storage_server),
)
def test_provides_interface(self):
"""
``storage_server`` provides ``IFoolscapStoragePlugin``.
"""
self.assertThat(
storage_server,
Provides([IFoolscapStoragePlugin]),
)
class ServerPluginTests(TestCase):
"""
Tests for the plugin's implementation of
``IFoolscapStoragePlugin.get_storage_server``.
"""
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@given(configurations())
def test_returns_announceable(self, configuration):
"""
``storage_server.get_storage_server`` returns an instance which provides
``IAnnounceableStorageServer``.
"""
storage_server_deferred = storage_server.get_storage_server(
configuration,
get_anonymous_storage_server,
)
self.assertThat(
storage_server_deferred,
succeeded(Provides([IAnnounceableStorageServer])),
)
@given(configurations())
def test_returns_referenceable(self, configuration):
"""
The storage server attached to the result of
``storage_server.get_storage_server`` provides ``IReferenceable`` and
``IRemotelyCallable``.
"""
storage_server_deferred = storage_server.get_storage_server(
configuration,
get_anonymous_storage_server,
)
self.assertThat(
storage_server_deferred,
succeeded(
AfterPreprocessing(
lambda ann: ann.storage_server,
Provides([IReferenceable, IRemotelyCallable]),
),
),
)
@given(configurations())
def test_returns_serializable(self, configuration):
"""
The storage server attached to the result of
``storage_server.get_storage_server`` can be serialized by a banana
Broker (for Foolscap).
"""
storage_server_deferred = storage_server.get_storage_server(
configuration,
get_anonymous_storage_server,
)
broker = Broker(None)
broker.makeConnection(StringTransport())
self.expectThat(
storage_server_deferred,
succeeded(
AfterPreprocessing(
lambda ann: broker.send(ann.storage_server),
Always(),
),
),
)
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
@given(configurations())
def test_returns_hashable(self, configuration):
"""
The storage server attached to the result of
``storage_server.get_storage_server`` is hashable for use as a Python
dictionary key.
This is another requirement of Foolscap.
"""
storage_server_deferred = storage_server.get_storage_server(
configuration,
get_anonymous_storage_server,
)
broker = Broker(None)
broker.makeConnection(StringTransport())
self.expectThat(
storage_server_deferred,
succeeded(
AfterPreprocessing(
lambda ann: hash(ann.storage_server),
Always(),
),
),
)
class ClientPluginTests(TestCase):
"""
Tests for the plugin's implementation of
``IFoolscapStoragePlugin.get_storage_client``.
"""
@given(configurations(), announcements())
def test_interface(self, configuration, announcement):
"""
``get_storage_client`` returns a ``Deferred`` that fires with an object
which provides ``IStorageServer``.
"""
storage_client_deferred = storage_server.get_storage_client(
configuration,
announcement,
get_rref,
)
self.assertThat(
storage_client_deferred,
succeeded(Provides([IStorageServer])),
)
class ClientResourceTests(TestCase):
"""
Tests for the plugin's implementation of
``IFoolscapStoragePlugin.get_client_resource``.
"""
@given(tahoe_configs())
def test_interface(self, get_config):
"""
``get_client_resource`` returns an object that provides ``IResource``.
"""
tempdir = self.useFixture(TempDir())
nodedir = tempdir.join(b"node")
config = get_config(nodedir, b"tub.port")
self.assertThat(
storage_server.get_client_resource(config),
Provides([IResource]),
)