Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Registry functions return Poll to enable parallel fetching of index data #10064

Merged
merged 4 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions crates/resolver-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt;
use std::fmt::Write;
use std::rc::Rc;
use std::task::Poll;
use std::time::Instant;

use cargo::core::dependency::DepKind;
Expand Down Expand Up @@ -129,14 +130,14 @@ pub fn resolve_with_config_raw(
dep: &Dependency,
f: &mut dyn FnMut(Summary),
fuzzy: bool,
) -> CargoResult<()> {
) -> Poll<CargoResult<()>> {
for summary in self.list.iter() {
if fuzzy || dep.matches(summary) {
self.used.insert(summary.package_id());
f(summary.clone());
}
}
Ok(())
Poll::Ready(Ok(()))
}

fn describe_source(&self, _src: SourceId) -> String {
Expand All @@ -146,6 +147,10 @@ pub fn resolve_with_config_raw(
fn is_replaced(&self, _src: SourceId) -> bool {
false
}

fn block_until_ready(&mut self) -> CargoResult<()> {
Ok(())
}
}
impl<'a> Drop for MyRegistry<'a> {
fn drop(&mut self) {
Expand Down
39 changes: 30 additions & 9 deletions src/cargo/core/compiler/future_incompat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt::Write as _;
use std::io::{Read, Write};
use std::task::Poll;

pub const REPORT_PREAMBLE: &str = "\
The following warnings were discovered during the build. These warnings are an
Expand Down Expand Up @@ -264,7 +265,7 @@ fn get_updates(ws: &Workspace<'_>, package_ids: &BTreeSet<PackageId>) -> Option<
let _lock = ws.config().acquire_package_cache_lock().ok()?;
// Create a set of updated registry sources.
let map = SourceConfigMap::new(ws.config()).ok()?;
let package_ids: BTreeSet<_> = package_ids
let mut package_ids: BTreeSet<_> = package_ids
.iter()
.filter(|pkg_id| pkg_id.source_id().is_registry())
.collect();
Expand All @@ -279,15 +280,35 @@ fn get_updates(ws: &Workspace<'_>, package_ids: &BTreeSet<PackageId>) -> Option<
Some((sid, source))
})
.collect();
// Query the sources for new versions.

// Query the sources for new versions, mapping `package_ids` into `summaries`.
let mut summaries = Vec::new();
while !package_ids.is_empty() {
package_ids.retain(|&pkg_id| {
let source = match sources.get_mut(&pkg_id.source_id()) {
Some(s) => s,
None => return false,
};
let dep = match Dependency::parse(pkg_id.name(), None, pkg_id.source_id()) {
Ok(dep) => dep,
Err(_) => return false,
};
match source.query_vec(&dep) {
Poll::Ready(Ok(sum)) => {
summaries.push((pkg_id, sum));
false
}
Poll::Ready(Err(_)) => false,
Poll::Pending => true,
}
});
for (_, source) in sources.iter_mut() {
source.block_until_ready().ok()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think one of the fundamental questions is can block_until_ready spuriously wake?
I.E. If block_until_ready returns, can the next call to query return Poll::Pending?

If Yes, then this loop is correct. But we should document it.

If No, then this loop seems like overkill. As it has to pass on the second iteration. (The same pattern is also true in other loops but I'm only gonna comment here.)

Another fundamental question is what happens if you call block_until_ready on a source that has not yet returned Poll::Pending?

To be specific, do we iterate over sources.iter_mut() or only the ones that are still in package_ids?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a given source and query parameters, a query call that previously returned Poll::Pending must return Poll::Ready after calling block_until_ready. If the query parameters are different (such as querying for a package that hasn't been queried before), then query may return Poll::Pending, even after a block_until_ready call.

Calling block_until_ready if a source hasn't yet returned a Poll::Pending is an interesting question. If you call invalidate_cache + block_until_ready, the git-based sources will do a fetch.

I agree we should be more precise here and only call block_until_ready for the sources remaining in package_ids

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a given source and query parameters, a query call that previously returned Poll::Pending must return Poll::Ready after calling block_until_ready. If the query parameters are different (such as querying for a package that hasn't been queried before), then query may return Poll::Pending, even after a block_until_ready call.

If we trust that must then we don't need a loop. So instead of "loops" like:

let config = loop {
    match self.config()? {
        Poll::Ready(cfg) => break cfg.unwrap(),
        Poll::Pending => self.block_until_ready()?,
    }
};

We can have:

let config = match self.config()? {
    Poll::Ready(cfg) => cfg.unwrap(),
    Poll::Pending => {
         self.block_until_ready()?;
         self.config()?.expect("must get Pending after block_until_ready").unwrap()
    }
};

of course will probably want some kind of helper to DRY the redundant self.config()?. And possibly a helper to deal with (the two) places where we have a vec of things to query.

Overnight the possibility of weakening it to should started to grow on me. So someone implementing the trait should make sure that the second query always returns a Ready. But, someone using the trait has to loop calling block_until_ready until the query returns Ready.

Whatever we decide we should document it on the trait.

Calling block_until_ready if a source hasn't yet returned a Poll::Pending is an interesting question. If you call invalidate_cache + block_until_ready, the git-based sources will do a fetch.

I think we should define block_until_ready before Poll::Pending/invalidate_cache as a NOP. For example this loop calls block_until_ready on all sources even if only some of them need it:

for (source_id, source) in self.sources.sources_mut() {
source
.block_until_ready()
.with_context(|| format!("Unable to update {}", source_id))?;
}

Keeping track of witch ones need it, seams like a lot of extra work.
Can we document on the trait when it is an NOP, and point out that it is a recommended optimization to avoid unneeded calls to block_until_ready?

Copy link
Contributor Author

@arlosi arlosi Mar 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've changed it to should return Poll::Ready after block_until_ready and updated the doc comments. Let me know if it makes more sense now!

}
}

let mut updates = String::new();
for pkg_id in package_ids {
let source = match sources.get_mut(&pkg_id.source_id()) {
Some(s) => s,
None => continue,
};
let dep = Dependency::parse(pkg_id.name(), None, pkg_id.source_id()).ok()?;
let summaries = source.query_vec(&dep).ok()?;
for (pkg_id, summaries) in summaries {
let mut updated_versions: Vec<_> = summaries
.iter()
.map(|summary| summary.version())
Expand Down
Loading