Newer
Older
#
# Create a new file via the web interface of a Tahoe-LAFS client node and then
# retrieve the contents of that file. Exit with success if the retrieved
# contents match what was uploaded originally.
#
from sys import argv
from os import urandom
from subprocess import check_output
from io import BytesIO
from time import sleep, ctime
from pprint import pprint
def main():
(clientDir,) = argv[1:]
someData = urandom(2 ** 16)
api_root = get_api_root(clientDir)
block_until_connected(api_root)
subject_cap = exercise_immutable(api_root, someData)
newDir = exercise_mkdir(api_root)
exercise_link_unlink(api_root, newDir, subject_cap)
def block_until_connected(api_root):
"""
Block until the Tahoe-LAFS node at the given API root reports it has
connected to at least one storage server.
"""
while True:
response = requests.get(
api_root.replace(query={u"t": u"json"}),
)
response.raise_for_status()
welcome = response.json()
servers = welcome["servers"]
connected = list(
server
for server
in servers
if server["connection_status"].startswith("Connected to ")
)
if len(connected) >= 1:
print(
"Connected to a server:\n"
"\t{nodeid}\n"
"\t{status}\n"
"\t{last_received_data}\n".format(
nodeid=connected[0]["nodeid"],
status=connected[0]["connection_status"],
last_received_data=ctime(connected[0]["last_received_data"]),
),
)
return
pprint(welcome)
sleep(0.1)
def exercise_immutable(api_root, someData):
cap = tahoe_put(api_root, someData)
dataReadBack = tahoe_get(api_root, cap)
assert someData == dataReadBack
def exercise_mkdir(api_root):
cap = tahoe_mkdir(api_root)
info = tahoe_stat(api_root, cap)
assert info
return info[1][u"rw_uri"]
def exercise_link_unlink(api_root, dir_cap, subject_cap):
tahoe_link(api_root, dir_cap, u"foo", subject_cap)
assert u"foo" in tahoe_stat(api_root, dir_cap)[1][u"children"]
tahoe_unlink(api_root, dir_cap, u"foo")
assert u"foo" not in tahoe_stat(api_root, dir_cap)[1][u"children"]
def get_api_root(path):
with open(path + u"/node.url") as f:
return hyperlink.URL.from_text(f.read().strip())
def tahoe_put(api_root, data, **kwargs):
response = requests.put(
api_root.child(u"uri").to_uri(),
BytesIO(data),
)
response.raise_for_status()
return response.text
def tahoe_get(api_root, cap):
response = requests.get(
api_root.child(u"uri", cap).to_uri(),
stream=True,
)
response.raise_for_status()
return response.raw.read()
def tahoe_mkdir(api_root):
response = requests.post(
api_root.child(u"uri").replace(query={u"t": u"mkdir", u"format": u"mdmf"}).to_uri(),
)
response.raise_for_status()
return response.text
def tahoe_link(api_root, dir_cap, name, subject_cap):
response = requests.put(
api_root.child(u"uri", dir_cap, name).replace(query={u"t": u"uri"}).to_uri(),
BytesIO(subject_cap.encode("ascii")),
)
response.raise_for_status()
return response.text
def tahoe_stat(api_root, cap):
response = requests.get(
api_root.child(u"uri", cap).replace(query={u"t": u"json"}).to_uri(),
)
response.raise_for_status()
return response.json()
def tahoe_unlink(api_root, dir_cap, name):
response = requests.delete(
api_root.child(u"uri", dir_cap, name).to_uri(),
)
response.raise_for_status()
return response.text
if __name__ == u'__main__':