Skip to content
Snippets Groups Projects
exercise-storage.py 3.77 KiB
Newer Older
  • Learn to ignore specific revisions
  • #!/usr/bin/env python3
    
    #
    # Create a new file via the web interface of a Tahoe-LAFS client node and then
    # retrieve the contents of that file.  Exit with success if the retrieved
    # contents match what was uploaded originally.
    #
    
    
    from sys import argv
    from os import urandom
    
    from subprocess import check_output
    from io import BytesIO
    
    from time import sleep, ctime
    from pprint import pprint
    
    
    def main():
        (clientDir,) = argv[1:]
    
        someData = urandom(2 ** 16)
    
    
        api_root = get_api_root(clientDir)
    
        block_until_connected(api_root)
    
    
        subject_cap = exercise_immutable(api_root, someData)
        newDir = exercise_mkdir(api_root)
        exercise_link_unlink(api_root, newDir, subject_cap)
    
    def block_until_connected(api_root):
        """
        Block until the Tahoe-LAFS node at the given API root reports it has
        connected to at least one storage server.
        """
        while True:
            response = requests.get(
                api_root.replace(query={u"t": u"json"}),
            )
            response.raise_for_status()
            welcome = response.json()
            servers = welcome["servers"]
            connected = list(
                server
                for server
                in servers
                if server["connection_status"].startswith("Connected to ")
            )
            if len(connected) >= 1:
                print(
                    "Connected to a server:\n"
                    "\t{nodeid}\n"
                    "\t{status}\n"
                    "\t{last_received_data}\n".format(
                        nodeid=connected[0]["nodeid"],
                        status=connected[0]["connection_status"],
                        last_received_data=ctime(connected[0]["last_received_data"]),
                    ),
                )
                return
            pprint(welcome)
            sleep(0.1)
    
    
    def exercise_immutable(api_root, someData):
        cap = tahoe_put(api_root, someData)
        dataReadBack = tahoe_get(api_root, cap)
    
        assert someData == dataReadBack
    
    def exercise_mkdir(api_root):
        cap = tahoe_mkdir(api_root)
        info = tahoe_stat(api_root, cap)
        assert info
    
        return info[1][u"rw_uri"]
    
    def exercise_link_unlink(api_root, dir_cap, subject_cap):
        tahoe_link(api_root, dir_cap, u"foo", subject_cap)
        assert u"foo" in tahoe_stat(api_root, dir_cap)[1][u"children"]
        tahoe_unlink(api_root, dir_cap, u"foo")
        assert u"foo" not in tahoe_stat(api_root, dir_cap)[1][u"children"]
    
    def get_api_root(path):
        with open(path + u"/node.url") as f:
    
            return hyperlink.URL.from_text(f.read().strip())
    
    def tahoe_put(api_root, data, **kwargs):
    
        response = requests.put(
            api_root.child(u"uri").to_uri(),
            BytesIO(data),
        )
    
        response.raise_for_status()
        return response.text
    
    
    def tahoe_get(api_root, cap):
    
        response = requests.get(
            api_root.child(u"uri", cap).to_uri(),
            stream=True,
        )
    
        response.raise_for_status()
        return response.raw.read()
    
    def tahoe_mkdir(api_root):
    
        response = requests.post(
            api_root.child(u"uri").replace(query={u"t": u"mkdir", u"format": u"mdmf"}).to_uri(),
        )
    
        response.raise_for_status()
        return response.text
    
    def tahoe_link(api_root, dir_cap, name, subject_cap):
    
        response = requests.put(
    
            api_root.child(u"uri", dir_cap, name).replace(query={u"t": u"uri"}).to_uri(),
    
            BytesIO(subject_cap.encode("ascii")),
    
        )
        response.raise_for_status()
        return response.text
    
    def tahoe_stat(api_root, cap):
    
        response = requests.get(
            api_root.child(u"uri", cap).replace(query={u"t": u"json"}).to_uri(),
        )
    
        response.raise_for_status()
    
        return response.json()
    
    def tahoe_unlink(api_root, dir_cap, name):
        response = requests.delete(
            api_root.child(u"uri", dir_cap, name).to_uri(),
        )
        response.raise_for_status()
        return response.text
    
    if __name__ == u'__main__':