Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Download crates in parallel #5161

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ atty = "0.2"
crates-io = { path = "src/crates-io", version = "0.16" }
crossbeam = "0.3"
crypto-hash = "0.3"
curl = "0.4.6"
curl = "0.4.11"
docopt = "0.8.1"
env_logger = "0.5"
failure = "0.1.1"
Expand Down Expand Up @@ -91,3 +91,4 @@ doc = false
[[test]]
name = "testsuite"
path = "tests/testsuite/lib.rs"

38 changes: 24 additions & 14 deletions src/cargo/core/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,32 @@ impl<'cfg> PackageSet<'cfg> {
Box::new(self.packages.keys())
}

pub fn get(&self, id: &PackageId) -> CargoResult<&Package> {
let slot = self.packages.get(id).ok_or_else(|| {
internal(format!("couldn't find `{}` in package set", id))
})?;
if let Some(pkg) = slot.borrow() {
return Ok(pkg)
pub fn get(&self, ids: &[&PackageId]) -> CargoResult<Vec<&Package>> {
let mut pending = BTreeMap::new();
for &id in ids {
let slot = self.packages.get(id).ok_or_else(|| {
internal(format!("couldn't find `{}` in package set", id))
})?;
if slot.borrow().is_none() {
let &mut (ref mut pending_ids, ref mut slots) = pending.entry(id.source_id()).or_insert_with(|| (Vec::new(), Vec::new()));
pending_ids.push(id);
slots.push(slot);
}
}
let mut sources = self.sources.borrow_mut();
let source = sources.get_mut(id.source_id()).ok_or_else(|| {
internal(format!("couldn't find source for `{}`", id))
})?;
let pkg = source.download(id).chain_err(|| {
format_err!("unable to get packages from source")
})?;
assert!(slot.fill(pkg).is_ok());
Ok(slot.borrow().unwrap())
for (source_id, &(ref pending_ids, ref slots)) in &pending {
let source = sources.get_mut(source_id).ok_or_else(|| {
internal(format!("couldn't find source `{}`", source_id))
})?;
let pkgs = source.download(&*pending_ids).chain_err(|| {
format_err!("unable to get packages from source")
})?;
for (pkg, slot) in pkgs.into_iter().zip(slots) {
assert!(slot.fill(pkg).is_ok());
}
}

Ok(ids.iter().map(|id| self.packages.get(id).unwrap().borrow().unwrap()).collect())
}

pub fn sources(&self) -> Ref<SourceMap<'cfg>> {
Expand Down
6 changes: 3 additions & 3 deletions src/cargo/core/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub trait Source: Registry {

/// The download method fetches the full package for each name and
/// version specified.
fn download(&mut self, package: &PackageId) -> CargoResult<Package>;
fn download(&mut self, ids: &[&PackageId]) -> CargoResult<Vec<Package>>;

/// Generates a unique string which represents the fingerprint of the
/// current state of the source.
Expand Down Expand Up @@ -57,8 +57,8 @@ impl<'a, T: Source + ?Sized + 'a> Source for Box<T> {
}

/// Forwards to `Source::download`
fn download(&mut self, id: &PackageId) -> CargoResult<Package> {
(**self).download(id)
fn download(&mut self, ids: &[&PackageId]) -> CargoResult<Vec<Package>> {
(**self).download(ids)
}

/// Forwards to `Source::fingerprint`
Expand Down
7 changes: 3 additions & 4 deletions src/cargo/ops/cargo_clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ pub fn clean(ws: &Workspace, opts: &CleanOptions) -> CargoResult<()> {
profiles)?;
let mut units = Vec::new();

for spec in opts.spec {
// Translate the spec to a Package
let pkgid = resolve.query(spec)?;
let pkg = packages.get(pkgid)?;
let pkg_ids: CargoResult<Vec<_>> = opts.spec.iter().map(|spec| resolve.query(spec)).collect();
let pkgs = packages.get(&*pkg_ids?)?;

for pkg in pkgs {
// Generate all relevant `Unit` targets for this package
for target in pkg.targets() {
for kind in [Kind::Host, Kind::Target].iter() {
Expand Down
11 changes: 5 additions & 6 deletions src/cargo/ops/cargo_compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,11 @@ pub fn compile_ws<'a>(ws: &Workspace<'a>,
)?;
let (packages, resolve_with_overrides) = resolve;

let to_builds = specs.iter().map(|p| {
let pkgid = p.query(resolve_with_overrides.iter())?;
let p = packages.get(pkgid)?;
p.manifest().print_teapot(ws.config());
Ok(p)
}).collect::<CargoResult<Vec<_>>>()?;
let pkg_ids: CargoResult<Vec<_>> = specs.iter().map(|p| p.query(resolve_with_overrides.iter())).collect();
let to_builds = packages.get(&*pkg_ids?)?;
for pkg in &to_builds {
pkg.manifest().print_teapot(ws.config());
}

let mut general_targets = Vec::new();
let mut package_targets = Vec::new();
Expand Down
6 changes: 2 additions & 4 deletions src/cargo/ops/cargo_doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ pub fn doc(ws: &Workspace, options: &DocOptions) -> CargoResult<()> {
&specs)?;
let (packages, resolve_with_overrides) = resolve;

let pkgs = specs.iter().map(|p| {
let pkgid = p.query(resolve_with_overrides.iter())?;
packages.get(pkgid)
}).collect::<CargoResult<Vec<_>>>()?;
let pkg_ids: CargoResult<Vec<_>> = specs.iter().map(|p| p.query(resolve_with_overrides.iter())).collect();
let pkgs = packages.get(&*pkg_ids?)?;

let mut lib_names = HashMap::new();
let mut bin_names = HashMap::new();
Expand Down
5 changes: 3 additions & 2 deletions src/cargo/ops/cargo_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use util::CargoResult;
/// Executes `cargo fetch`.
pub fn fetch<'a>(ws: &Workspace<'a>) -> CargoResult<(Resolve, PackageSet<'a>)> {
let (packages, resolve) = ops::resolve_ws(ws)?;
for id in resolve.iter() {
packages.get(id)?;
{
let pkg_ids: Vec<_> = resolve.iter().collect();
packages.get(&*pkg_ids)?;
}
Ok((resolve, packages))
}
4 changes: 2 additions & 2 deletions src/cargo/ops/cargo_install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ fn select_pkg<'a, T>(mut source: T,
let deps = source.query_vec(&dep)?;
match deps.iter().map(|p| p.package_id()).max() {
Some(pkgid) => {
let pkg = source.download(pkgid)?;
Ok((pkg, Box::new(source)))
let pkg = source.download(&[pkgid])?;
Ok((pkg.into_iter().next().unwrap(), Box::new(source)))
}
None => {
let vers_info = vers.map(|v| format!(" with version `{}`", v))
Expand Down
5 changes: 2 additions & 3 deletions src/cargo/ops/cargo_output_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ fn metadata_full(ws: &Workspace,
&specs)?;
let (packages, resolve) = deps;

let packages = packages.package_ids()
.map(|i| packages.get(i).map(|p| p.clone()))
.collect::<CargoResult<Vec<_>>>()?;
let package_ids: Vec<_> = packages.package_ids().collect();
let packages: Vec<_> = packages.get(&*package_ids)?.into_iter().cloned().collect();

Ok(ExportInfo {
packages,
Expand Down
40 changes: 18 additions & 22 deletions src/cargo/ops/cargo_rustc/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ impl<'a, 'cfg> Context<'a, 'cfg> {

let id = unit.pkg.package_id();
let deps = self.resolve.deps(id);
let mut ret = deps.filter(|dep| {
let pkg_ids: Vec<_> = deps.filter(|dep| {
unit.pkg.dependencies().iter().filter(|d| {
d.name() == dep.name() && d.version_req().matches(dep.version())
}).any(|d| {
Expand Down Expand Up @@ -814,22 +814,19 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
// actually used!
true
})
}).filter_map(|id| {
match self.get_package(id) {
Ok(pkg) => {
pkg.targets().iter().find(|t| t.is_lib()).map(|t| {
let unit = Unit {
pkg,
target: t,
profile: self.lib_or_check_profile(unit, t),
kind: unit.kind.for_target(t),
};
Ok(unit)
})
}).collect();

let pkgs = self.get_packages(&*pkg_ids)?;
let mut ret: Vec<_> = pkgs.into_iter().filter_map(|pkg| {
pkg.targets().iter().find(|t| t.is_lib()).map(|t| {
Unit {
pkg,
target: t,
profile: self.lib_or_check_profile(unit, t),
kind: unit.kind.for_target(t),
}
Err(e) => Some(Err(e))
}
}).collect::<CargoResult<Vec<_>>>()?;
})
}).collect();

// If this target is a build script, then what we've collected so far is
// all we need. If this isn't a build script, then it depends on the
Expand Down Expand Up @@ -913,7 +910,7 @@ impl<'a, 'cfg> Context<'a, 'cfg> {

/// Returns the dependencies necessary to document a package
fn doc_deps(&self, unit: &Unit<'a>) -> CargoResult<Vec<Unit<'a>>> {
let deps = self.resolve.deps(unit.pkg.package_id()).filter(|dep| {
let dep_ids: Vec<_> = self.resolve.deps(unit.pkg.package_id()).filter(|dep| {
unit.pkg.dependencies().iter().filter(|d| {
d.name() == dep.name()
}).any(|dep| {
Expand All @@ -923,16 +920,15 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
_ => false,
}
})
}).map(|dep| {
self.get_package(dep)
});
}).collect();

let deps = self.get_packages(&*dep_ids)?;

// To document a library, we depend on dependencies actually being
// built. If we're documenting *all* libraries, then we also depend on
// the documentation of the library being built.
let mut ret = Vec::new();
for dep in deps {
let dep = dep?;
let lib = match dep.targets().iter().find(|t| t.is_lib()) {
Some(lib) => lib,
None => continue,
Expand Down Expand Up @@ -1007,7 +1003,7 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
}

/// Gets a package for the given package id.
pub fn get_package(&self, id: &PackageId) -> CargoResult<&'a Package> {
pub fn get_packages(&self, id: &[&PackageId]) -> CargoResult<Vec<&'a Package>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this method is pretty interesting. Long ago Cargo had the signature:

pub fn get_package(&self, id: &PackageId) -> &'a Package;

but we then changed to lazily download packages in order to close #2394. That was the easiest fix to do at the time but I think is causing issues with this patch.

I think to get the most benefit out of the scheme (downloading in parallel) we'll want to probably implement a separate pass over Context to collect a list of packages that we're gonna want to download. Perhaps that could be deferred to a future PR, though? (just want to make sure we don't lose track of this)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm willing to do some refactor, thanks for the reference!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I've been thinking recently that the end-state here may be to "future-ify" everything where we just create a gob of futures and throw them at tokio to do the whole build, although that's certainly not a refactoring required for this PR!

self.packages.get(id)
}

Expand Down
2 changes: 1 addition & 1 deletion src/cargo/ops/cargo_rustc/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ impl<'a> Key<'a> {
fn dependencies<'cfg>(&self, cx: &Context<'a, 'cfg>)
-> CargoResult<Vec<Key<'a>>> {
let unit = Unit {
pkg: cx.get_package(self.pkg)?,
pkg: cx.get_packages(&[self.pkg])?[0],
target: self.target,
profile: self.profile,
kind: self.kind,
Expand Down
9 changes: 5 additions & 4 deletions src/cargo/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ pub use self::cargo_generate_lockfile::UpdateOptions;
pub use self::lockfile::{load_pkg_lockfile, write_pkg_lockfile};
pub use self::cargo_test::{run_tests, run_benches, TestOptions};
pub use self::cargo_package::{package, PackageOpts};
pub use self::registry::{publish, registry_configuration, RegistryConfig};
pub use self::registry::{registry_login, search, needs_custom_http_transport, http_handle};
pub use self::registry::{modify_owners, yank, OwnersOptions, PublishOpts};
pub use self::registry::configure_http_handle;
pub use self::registry::{publish, registry_configuration, RegistryConfig,
registry_login, search, needs_custom_http_transport,
http_handle, http_easy2_handle, http_multi_handle,
modify_owners, yank, OwnersOptions, PublishOpts,
configure_http_handle, configure_http_easy2_handle};
pub use self::cargo_fetch::fetch;
pub use self::cargo_pkgid::pkgid;
pub use self::resolve::{resolve_ws, resolve_ws_precisely, resolve_ws_with_method, resolve_with_previous};
Expand Down
61 changes: 60 additions & 1 deletion src/cargo/ops/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::fs::{self, File};
use std::iter::repeat;
use std::time::Duration;

use curl::easy::{Easy, SslOpt};
use curl::easy::{Easy, Easy2, Handler, SslOpt, HttpVersion};
use curl::multi::Multi;
use git2;
use registry::{Registry, NewCrate, NewCrateDependency};

Expand Down Expand Up @@ -291,6 +292,60 @@ pub fn http_handle(config: &Config) -> CargoResult<Easy> {
Ok(handle)
}

pub fn http_multi_handle(config: &Config) -> CargoResult<Multi> {
if config.frozen() {
bail!("attempting to make an HTTP request, but --frozen was \
specified")
}
if !config.network_allowed() {
bail!("can't make HTTP request in the offline mode")
}

let mut handle = Multi::new();
let pipelining = config.get_bool("http.pipelining")?.map(|x| x.val).unwrap_or(true);
let multiplexing = config.get_bool("http.multiplexing")?.map(|x| x.val).unwrap_or(true);
handle.pipelining(pipelining, multiplexing)?;
Ok(handle)
}

/// Create a curl Easy2 handle with default options
pub fn http_easy2_handle<H: Handler>(config: &Config, handler: H) -> CargoResult<Easy2<H>> {
let mut handle = Easy2::new(handler);
configure_http_easy2_handle(config, &mut handle)?;
Ok(handle)
}

pub fn configure_http_easy2_handle<H: Handler>(config: &Config, handle: &mut Easy2<H>) -> CargoResult<()> {
// This is a duplicate of configure_http_handle, due to Easy and Easy2
// being completely different types.
// The timeout option for libcurl by default times out the entire transfer,
// but we probably don't want this. Instead we only set timeouts for the
// connect phase as well as a "low speed" timeout so if we don't receive
// many bytes in a large-ish period of time then we time out.
handle.connect_timeout(Duration::new(30, 0))?;
handle.low_speed_limit(10 /* bytes per second */)?;
handle.low_speed_time(Duration::new(30, 0))?;
handle.useragent(&version().to_string())?;
// Not all cURL builds support HTTP/2, ignore any errors.
if config.get_bool("http.http2")?.map(|x| x.val).unwrap_or(true) {
let _ = handle.http_version(HttpVersion::V2TLS);
}
if let Some(proxy) = http_proxy(config)? {
handle.proxy(&proxy)?;
}
if let Some(cainfo) = config.get_path("http.cainfo")? {
handle.cainfo(&cainfo.val)?;
}
if let Some(check) = config.get_bool("http.check-revoke")? {
handle.ssl_options(SslOpt::new().no_revoke(!check.val))?;
}
if let Some(timeout) = http_timeout(config)? {
handle.connect_timeout(Duration::new(timeout as u64, 0))?;
handle.low_speed_time(Duration::new(timeout as u64, 0))?;
}
Ok(())
}

pub fn needs_custom_http_transport(config: &Config) -> CargoResult<bool> {
let proxy_exists = http_proxy_exists(config)?;
let timeout = http_timeout(config)?;
Expand All @@ -310,6 +365,10 @@ pub fn configure_http_handle(config: &Config, handle: &mut Easy) -> CargoResult<
handle.low_speed_limit(10 /* bytes per second */)?;
handle.low_speed_time(Duration::new(30, 0))?;
handle.useragent(&version().to_string())?;
// Not all cURL builds support HTTP/2, ignore any errors.
if config.get_bool("http.http2")?.map(|x| x.val).unwrap_or(true) {
let _ = handle.http_version(HttpVersion::V2TLS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this perhaps emit a warning if an error happens?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this option turned on by default, so it could be annoying and confusing to those using old libcurl.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good point yeah, I was thinking that if you explicitly configured http2 = true and it wasn't available we may want to warn, but it looks like this only exists for http2 = false so I think it's ok to not warn here

}
if let Some(proxy) = http_proxy(config)? {
handle.proxy(&proxy)?;
}
Expand Down
9 changes: 5 additions & 4 deletions src/cargo/sources/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,11 @@ impl<'cfg> Source for DirectorySource<'cfg> {
Ok(())
}

fn download(&mut self, id: &PackageId) -> CargoResult<Package> {
self.packages.get(id).map(|p| &p.0).cloned().ok_or_else(|| {
format_err!("failed to find package with id: {}", id)
})
fn download(&mut self, ids: &[&PackageId]) -> CargoResult<Vec<Package>> {
ids.iter().map(|id|
self.packages.get(id).map(|p| &p.0).cloned().ok_or_else(|| {
format_err!("failed to find package with id: {}", id)
})).collect()
}

fn fingerprint(&self, pkg: &Package) -> CargoResult<String> {
Expand Down
12 changes: 7 additions & 5 deletions src/cargo/sources/git/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,14 @@ impl<'cfg> Source for GitSource<'cfg> {
self.path_source.as_mut().unwrap().update()
}

fn download(&mut self, id: &PackageId) -> CargoResult<Package> {
trace!("getting packages for package id `{}` from `{:?}`", id,
self.remote);
fn download(&mut self, ids: &[&PackageId]) -> CargoResult<Vec<Package>> {
for id in ids {
trace!("getting packages for package id `{}` from `{:?}`", id,
self.remote);
}
self.path_source.as_mut()
.expect("BUG: update() must be called before get()")
.download(id)
.expect("BUG: update() must be called before get()")
.download(ids)
}

fn fingerprint(&self, _pkg: &Package) -> CargoResult<String> {
Expand Down
Loading