From 630e2c9f239db19d10d3f054609ff0bd504449bc Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone <exarkun@twistedmatrix.com> Date: Thu, 29 Jun 2023 07:28:40 -0400 Subject: [PATCH] Look up share placement info in parallel --- src/Tahoe/Download.hs | 44 ++++++++++++++++++++----------------------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/src/Tahoe/Download.hs b/src/Tahoe/Download.hs index 89ca61c..4f0f41f 100644 --- a/src/Tahoe/Download.hs +++ b/src/Tahoe/Download.hs @@ -62,22 +62,22 @@ download :: -- | The read capability for the application data. readCap -> -- | Get functions for interacting with a server given its URL. - LookupServer m -> + LookupServer IO -> -- | Either a description of how the recovery failed or the recovered -- application data. m (Either DownloadError LB.ByteString) download servers cap lookupServer = do - print' ("Going to download: " <> show (getStorageIndex $ getVerifiable cap)) + print' ("Downloading: " <> show (getStorageIndex $ getVerifiable cap)) let verifier = getVerifiable cap let storageIndex = getStorageIndex verifier -- TODO: If getRequiredTotal fails on the first storage server, we may -- need to try more. If it fails for all of them, we need to represent -- the failure coherently. - someParam <- firstRightM lookupServer (getRequiredTotal verifier) (Map.elems servers) + someParam <- liftIO $ firstRightM lookupServer (getRequiredTotal verifier) (Map.elems servers) case someParam of Left errs -> pure . Left $ if servers == mempty then NoConfiguredServers else NoReachableServers (StorageServerUnreachable <$> errs) Right (required, _) -> do - locationE <- locateShares servers lookupServer storageIndex (fromIntegral required) + locationE <- liftIO $ locateShares servers lookupServer storageIndex (fromIntegral required) print' "Finished locating shares" case locationE of Left err -> do @@ -86,9 +86,9 @@ download servers cap lookupServer = do Right discovered -> do print' "Found some shares, fetching them" -- XXX note shares can contain failures - shares <- executeDownloadTasks storageIndex (makeDownloadTasks =<< discovered) + shares <- liftIO $ executeDownloadTasks storageIndex (makeDownloadTasks =<< discovered) print' "Fetched the shares, decoding them" - s <- decodeShares cap shares required + s <- liftIO $ decodeShares cap shares required print' "Decoded them" pure s @@ -97,7 +97,7 @@ download servers cap lookupServer = do Nothings, return a list of the values in the Lefts. Otherwise, return the *first* Right. -} -firstRightM :: MonadIO m => (a -> m (Either b c)) -> (c -> m (Maybe d)) -> [a] -> m (Either [b] d) +firstRightM :: Monad m => (a -> m (Either b c)) -> (c -> m (Maybe d)) -> [a] -> m (Either [b] d) firstRightM _ _ [] = pure $ Left [] firstRightM f op (x : xs) = do s <- f x @@ -115,34 +115,32 @@ firstRightM f op (x : xs) = do results. -} executeDownloadTasks :: - MonadIO m => -- | The storage index of the shares to download. StorageIndex -> -- | The downloads to attempt. [DownloadTask] -> -- | The results of all successful downloads. - m [DownloadedShare] + IO [DownloadedShare] executeDownloadTasks storageIndex tasks = do - downloadResults <- liftIO $ mapConcurrently (downloadShare storageIndex) tasks + downloadResults <- mapConcurrently (downloadShare storageIndex) tasks pure . rights $ inject <$> downloadResults where inject (a, b) = (a,) <$> b -- | Find out which servers claim to have shares related to a given storage index. locateShares :: - MonadIO m => -- | Information about the servers from which to consider downloading shares -- representing the application data. Map.Map StorageServerID StorageServerAnnouncement -> -- | Get functions for interacting with a server given its URL. - LookupServer m -> + LookupServer IO -> -- | The storage index about which to retrieve information. B.ByteString -> -- | The number of shares we need to locate. If we cannot find at least -- this many shares the result will be an error. Word16 -> -- | Either an error or a guide to where shares are placed. - m (Either DownloadError [(StorageServer, Set.Set ShareNum)]) + IO (Either DownloadError [(StorageServer, Set.Set ShareNum)]) locateShares servers lookupServer storageIndex required = case Map.toList servers of [] -> pure . Left $ NoConfiguredServers @@ -152,7 +150,7 @@ locateShares servers lookupServer storageIndex required = ( problems :: [DiscoverError] , discovered :: [(StorageServer, Set.Set ShareNum)] ) <- - partitionEithers <$> mapM (discoverShares lookupServer storageIndex) serverList + partitionEithers <$> mapConcurrently (discoverShares lookupServer storageIndex) serverList if null discovered then pure . Left . NoReachableServers $ problems else @@ -164,13 +162,13 @@ locateShares servers lookupServer storageIndex required = decode them and decrypt the contents of possible. -} decodeShares :: - (MonadIO m, Readable readCap, Verifiable v, v ~ Verifier readCap) => + (Readable readCap, Verifiable v, v ~ Verifier readCap) => -- | The read capability which allows the contents to be decrypted. readCap -> -- | The results of downloading the shares. [DownloadedShare] -> Int -> - m (Either DownloadError LB.ByteString) + IO (Either DownloadError LB.ByteString) decodeShares r shares required = do -- Filter down to shares we actually got. let fewerShares = second (deserializeShare (getVerifiable r)) <$> shares @@ -190,11 +188,10 @@ countDistinctShares = Set.size . foldl' Set.union mempty . map snd question. -} discoverShares :: - MonadIO m => - LookupServer m -> + LookupServer IO -> StorageIndex -> (StorageServerID, StorageServerAnnouncement) -> - m (Either DiscoverError (StorageServer, Set.Set ShareNum)) + IO (Either DiscoverError (StorageServer, Set.Set ShareNum)) discoverShares lookupServer storageIndex (_sid, sann) = do print' "Looking up server from announcement" server <- lookupServer sann @@ -203,7 +200,7 @@ discoverShares lookupServer storageIndex (_sid, sann) = do Left e -> pure . Left . StorageServerUnreachable $ e Right ss@StorageServer{storageServerGetBuckets} -> do print' $ "Getting buckets for " <> show storageIndex - buckets <- liftIO $ try (storageServerGetBuckets storageIndex) + buckets <- try (storageServerGetBuckets storageIndex) let massaged = first (StorageServerCommunicationError . (displayException :: SomeException -> String)) buckets print' $ "Got them " <> show massaged pure $ (ss,) <$> massaged @@ -217,17 +214,16 @@ makeDownloadTasks (v, ks) = zip (Set.toList ks) (repeat v) -- | Download the bytes of a share from one (or more!) of the given servers. downloadShare :: - MonadIO m => -- | The storage index of the share to download. StorageIndex -> -- | Addressing information about the share to download. DownloadTask -> -- | The bytes of the share or some error that was encountered during -- download. - m (ShareNum, Either DownloadError LB.ByteString) + IO (ShareNum, Either DownloadError LB.ByteString) downloadShare storageIndex (shareNum, s) = do print' $ "Going to download " <> show storageIndex <> " " <> show shareNum - shareBytes <- liftIO $ try (storageServerRead s storageIndex shareNum) + shareBytes <- try (storageServerRead s storageIndex shareNum) let massaged = first (ShareDownloadError . (displayException :: SomeException -> String)) shareBytes print' "Downloaded it" pure (shareNum, LB.fromStrict <$> massaged) @@ -243,7 +239,7 @@ downloadDirectory :: -- | The read capability for the application data. DirectoryCapability readCap -> -- | Get functions for interacting with a server given its URL. - LookupServer m -> + LookupServer IO -> -- | Either a description of how the recovery failed or the recovered -- application data. m (Either DirectoryDownloadError Directory) -- GitLab