Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
G
gbs-downloader
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
This is an archived project. Repository and other project resources are read-only.
Show more breadcrumbs
PrivateStorage
gbs-downloader
Commits
1d353f04
Commit
1d353f04
authored
1 year ago
by
Jean-Paul Calderone
Browse files
Options
Downloads
Plain Diff
Merge branch '11.fetch-shares-in-parallel' into 'main'
fetch all the shares at the same time Closes
#11
See merge request
!12
parents
915f54ce
630e2c9f
No related branches found
No related tags found
1 merge request
!12
fetch all the shares at the same time
Pipeline
#5022
passed
1 year ago
Stage: test
Changes
2
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
gbs-downloader.cabal
+2
-0
2 additions, 0 deletions
gbs-downloader.cabal
src/Tahoe/Download.hs
+21
-24
21 additions, 24 deletions
src/Tahoe/Download.hs
with
23 additions
and
24 deletions
gbs-downloader.cabal
+
2
−
0
View file @
1d353f04
...
...
@@ -95,6 +95,7 @@ library
-- Other library packages from which modules are imported.
build-depends:
, aeson
, async
, base
, base32
, base64-bytestring
...
...
@@ -206,6 +207,7 @@ test-suite gbs-downloader-test
-- Base language which the package is written in.
default-language: Haskell2010
ghc-options: -threaded
-- Modules included in this executable, other than Main.
other-modules: Generators
...
...
This diff is collapsed.
Click to expand it.
src/Tahoe/Download.hs
+
21
−
24
View file @
1d353f04
...
...
@@ -17,6 +17,7 @@ module Tahoe.Download (
getShareNumbers
,
)
where
import
Control.Concurrent.Async
(
mapConcurrently
)
import
Control.Exception
(
Exception
(
displayException
),
SomeException
,
try
)
import
Control.Monad.IO.Class
(
MonadIO
(
liftIO
))
import
Data.Bifunctor
(
Bifunctor
(
first
,
second
))
...
...
@@ -61,22 +62,22 @@ download ::
-- | The read capability for the application data.
readCap
->
-- | Get functions for interacting with a server given its URL.
LookupServer
m
->
LookupServer
IO
->
-- | Either a description of how the recovery failed or the recovered
-- application data.
m
(
Either
DownloadError
LB
.
ByteString
)
download
servers
cap
lookupServer
=
do
print'
(
"
Going to d
ownload: "
<>
show
(
getStorageIndex
$
getVerifiable
cap
))
print'
(
"
D
ownload
ing
: "
<>
show
(
getStorageIndex
$
getVerifiable
cap
))
let
verifier
=
getVerifiable
cap
let
storageIndex
=
getStorageIndex
verifier
-- TODO: If getRequiredTotal fails on the first storage server, we may
-- need to try more. If it fails for all of them, we need to represent
-- the failure coherently.
someParam
<-
firstRightM
lookupServer
(
getRequiredTotal
verifier
)
(
Map
.
elems
servers
)
someParam
<-
liftIO
$
firstRightM
lookupServer
(
getRequiredTotal
verifier
)
(
Map
.
elems
servers
)
case
someParam
of
Left
errs
->
pure
.
Left
$
if
servers
==
mempty
then
NoConfiguredServers
else
NoReachableServers
(
StorageServerUnreachable
<$>
errs
)
Right
(
required
,
_
)
->
do
locationE
<-
locateShares
servers
lookupServer
storageIndex
(
fromIntegral
required
)
locationE
<-
liftIO
$
locateShares
servers
lookupServer
storageIndex
(
fromIntegral
required
)
print'
"Finished locating shares"
case
locationE
of
Left
err
->
do
...
...
@@ -85,9 +86,9 @@ download servers cap lookupServer = do
Right
discovered
->
do
print'
"Found some shares, fetching them"
-- XXX note shares can contain failures
shares
<-
executeDownloadTasks
storageIndex
(
makeDownloadTasks
=<<
discovered
)
shares
<-
liftIO
$
executeDownloadTasks
storageIndex
(
makeDownloadTasks
=<<
discovered
)
print'
"Fetched the shares, decoding them"
s
<-
decodeShares
cap
shares
required
s
<-
liftIO
$
decodeShares
cap
shares
required
print'
"Decoded them"
pure
s
...
...
@@ -96,7 +97,7 @@ download servers cap lookupServer = do
Nothings, return a list of the values in the Lefts. Otherwise, return the
*first* Right.
-}
firstRightM
::
Monad
IO
m
=>
(
a
->
m
(
Either
b
c
))
->
(
c
->
m
(
Maybe
d
))
->
[
a
]
->
m
(
Either
[
b
]
d
)
firstRightM
::
Monad
m
=>
(
a
->
m
(
Either
b
c
))
->
(
c
->
m
(
Maybe
d
))
->
[
a
]
->
m
(
Either
[
b
]
d
)
firstRightM
_
_
[]
=
pure
$
Left
[]
firstRightM
f
op
(
x
:
xs
)
=
do
s
<-
f
x
...
...
@@ -114,34 +115,32 @@ firstRightM f op (x : xs) = do
results.
-}
executeDownloadTasks
::
MonadIO
m
=>
-- | The storage index of the shares to download.
StorageIndex
->
-- | The downloads to attempt.
[
DownloadTask
]
->
-- | The results of all successful downloads.
m
[
DownloadedShare
]
IO
[
DownloadedShare
]
executeDownloadTasks
storageIndex
tasks
=
do
downloadResults
<-
map
M
(
downloadShare
storageIndex
)
tasks
downloadResults
<-
map
Concurrently
(
downloadShare
storageIndex
)
tasks
pure
.
rights
$
inject
<$>
downloadResults
where
inject
(
a
,
b
)
=
(
a
,)
<$>
b
-- | Find out which servers claim to have shares related to a given storage index.
locateShares
::
MonadIO
m
=>
-- | Information about the servers from which to consider downloading shares
-- representing the application data.
Map
.
Map
StorageServerID
StorageServerAnnouncement
->
-- | Get functions for interacting with a server given its URL.
LookupServer
m
->
LookupServer
IO
->
-- | The storage index about which to retrieve information.
B
.
ByteString
->
-- | The number of shares we need to locate. If we cannot find at least
-- this many shares the result will be an error.
Word16
->
-- | Either an error or a guide to where shares are placed.
m
(
Either
DownloadError
[(
StorageServer
,
Set
.
Set
ShareNum
)])
IO
(
Either
DownloadError
[(
StorageServer
,
Set
.
Set
ShareNum
)])
locateShares
servers
lookupServer
storageIndex
required
=
case
Map
.
toList
servers
of
[]
->
pure
.
Left
$
NoConfiguredServers
...
...
@@ -151,7 +150,7 @@ locateShares servers lookupServer storageIndex required =
(
problems
::
[
DiscoverError
]
,
discovered
::
[(
StorageServer
,
Set
.
Set
ShareNum
)]
)
<-
partitionEithers
<$>
map
M
(
discoverShares
lookupServer
storageIndex
)
serverList
partitionEithers
<$>
map
Concurrently
(
discoverShares
lookupServer
storageIndex
)
serverList
if
null
discovered
then
pure
.
Left
.
NoReachableServers
$
problems
else
...
...
@@ -163,13 +162,13 @@ locateShares servers lookupServer storageIndex required =
decode them and decrypt the contents of possible.
-}
decodeShares
::
(
MonadIO
m
,
Readable
readCap
,
Verifiable
v
,
v
~
Verifier
readCap
)
=>
(
Readable
readCap
,
Verifiable
v
,
v
~
Verifier
readCap
)
=>
-- | The read capability which allows the contents to be decrypted.
readCap
->
-- | The results of downloading the shares.
[
DownloadedShare
]
->
Int
->
m
(
Either
DownloadError
LB
.
ByteString
)
IO
(
Either
DownloadError
LB
.
ByteString
)
decodeShares
r
shares
required
=
do
-- Filter down to shares we actually got.
let
fewerShares
=
second
(
deserializeShare
(
getVerifiable
r
))
<$>
shares
...
...
@@ -189,11 +188,10 @@ countDistinctShares = Set.size . foldl' Set.union mempty . map snd
question.
-}
discoverShares
::
MonadIO
m
=>
LookupServer
m
->
LookupServer
IO
->
StorageIndex
->
(
StorageServerID
,
StorageServerAnnouncement
)
->
m
(
Either
DiscoverError
(
StorageServer
,
Set
.
Set
ShareNum
))
IO
(
Either
DiscoverError
(
StorageServer
,
Set
.
Set
ShareNum
))
discoverShares
lookupServer
storageIndex
(
_sid
,
sann
)
=
do
print'
"Looking up server from announcement"
server
<-
lookupServer
sann
...
...
@@ -202,7 +200,7 @@ discoverShares lookupServer storageIndex (_sid, sann) = do
Left
e
->
pure
.
Left
.
StorageServerUnreachable
$
e
Right
ss
@
StorageServer
{
storageServerGetBuckets
}
->
do
print'
$
"Getting buckets for "
<>
show
storageIndex
buckets
<-
liftIO
$
try
(
storageServerGetBuckets
storageIndex
)
buckets
<-
try
(
storageServerGetBuckets
storageIndex
)
let
massaged
=
first
(
StorageServerCommunicationError
.
(
displayException
::
SomeException
->
String
))
buckets
print'
$
"Got them "
<>
show
massaged
pure
$
(
ss
,)
<$>
massaged
...
...
@@ -216,17 +214,16 @@ makeDownloadTasks (v, ks) = zip (Set.toList ks) (repeat v)
-- | Download the bytes of a share from one (or more!) of the given servers.
downloadShare
::
MonadIO
m
=>
-- | The storage index of the share to download.
StorageIndex
->
-- | Addressing information about the share to download.
DownloadTask
->
-- | The bytes of the share or some error that was encountered during
-- download.
m
(
ShareNum
,
Either
DownloadError
LB
.
ByteString
)
IO
(
ShareNum
,
Either
DownloadError
LB
.
ByteString
)
downloadShare
storageIndex
(
shareNum
,
s
)
=
do
print'
$
"Going to download "
<>
show
storageIndex
<>
" "
<>
show
shareNum
shareBytes
<-
liftIO
$
try
(
storageServerRead
s
storageIndex
shareNum
)
shareBytes
<-
try
(
storageServerRead
s
storageIndex
shareNum
)
let
massaged
=
first
(
ShareDownloadError
.
(
displayException
::
SomeException
->
String
))
shareBytes
print'
"Downloaded it"
pure
(
shareNum
,
LB
.
fromStrict
<$>
massaged
)
...
...
@@ -242,7 +239,7 @@ downloadDirectory ::
-- | The read capability for the application data.
DirectoryCapability
readCap
->
-- | Get functions for interacting with a server given its URL.
LookupServer
m
->
LookupServer
IO
->
-- | Either a description of how the recovery failed or the recovered
-- application data.
m
(
Either
DirectoryDownloadError
Directory
)
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment