diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index dd013be3..05e8ac88 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -19,11 +19,9 @@ jobs: container: quay.io/coreos-assembler/fcos-buildroot:testing-devel steps: - - name: Install skopeo - run: yum -y install skopeo - - name: Update ostree - run: yum -y --enablerepo=updates-testing update ostree-devel - uses: actions/checkout@v2 + - name: Install deps + run: ./ci/installdeps.sh - name: Format run: cargo fmt -- --check -l - name: Build diff --git a/ci/installdeps.sh b/ci/installdeps.sh new file mode 100755 index 00000000..606032ed --- /dev/null +++ b/ci/installdeps.sh @@ -0,0 +1,10 @@ +#!/bin/bash +set -xeuo pipefail + +yum -y install skopeo +yum -y --enablerepo=updates-testing update ostree-devel + +git clone --depth=1 https://github.com/cgwalters/container-image-proxy +cd container-image-proxy +make +install -m 0755 bin/container-image-proxy /usr/bin/ diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 0c2b111e..96028300 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -10,6 +10,7 @@ version = "0.4.0-alpha.0" [dependencies] anyhow = "1.0" +async-compression = { version = "0.3", features = ["gzip", "tokio"] } bytes = "1.0.1" bitflags = "1.3.2" camino = "1.0.4" @@ -19,6 +20,7 @@ fn-error-context = "0.2.0" futures-util = "0.3.13" gvariant = "0.4.0" hex = "0.4.3" +hyper = { version = "0.14", features = ["full"] } indicatif = "0.16.0" lazy_static = "1.4.0" libc = "0.2.92" diff --git a/lib/src/cli.rs b/lib/src/cli.rs index 6a595ab7..2b15ea4e 100644 --- a/lib/src/cli.rs +++ b/lib/src/cli.rs @@ -73,6 +73,10 @@ enum ContainerOpts { /// Create an ostree ref pointing to the imported commit #[structopt(long)] write_ref: Option, + + /// Don't display progress + #[structopt(long)] + quiet: bool, }, /// Print information about an exported ostree-container image. @@ -155,17 +159,27 @@ fn tar_export(opts: &ExportOpts) -> Result<()> { } /// Import a container image with an encapsulated ostree commit. -async fn container_import(repo: &str, imgref: &str, write_ref: Option<&str>) -> Result<()> { +async fn container_import( + repo: &str, + imgref: &str, + write_ref: Option<&str>, + quiet: bool, +) -> Result<()> { let repo = &ostree::Repo::open_at(libc::AT_FDCWD, repo, gio::NONE_CANCELLABLE)?; let imgref = imgref.try_into()?; let (tx_progress, rx_progress) = tokio::sync::watch::channel(Default::default()); let target = indicatif::ProgressDrawTarget::stdout(); let style = indicatif::ProgressStyle::default_bar(); - let pb = indicatif::ProgressBar::new_spinner(); - pb.set_draw_target(target); - pb.set_style(style.template("{spinner} {prefix} {msg}")); - pb.enable_steady_tick(200); - pb.set_message("Downloading..."); + let pb = if !quiet { + let pb = indicatif::ProgressBar::new_spinner(); + pb.set_draw_target(target); + pb.set_style(style.template("{spinner} {prefix} {msg}")); + pb.enable_steady_tick(200); + pb.set_message("Downloading..."); + Some(pb) + } else { + None + }; let opts = ImportOptions { progress: Some(tx_progress), }; @@ -176,10 +190,14 @@ async fn container_import(repo: &str, imgref: &str, write_ref: Option<&str>) -> tokio::select! { _ = rx_progress.changed() => { let n = rx_progress.borrow().processed_bytes; - pb.set_message(format!("Processed: {}", indicatif::HumanBytes(n))); + if let Some(pb) = pb.as_ref() { + pb.set_message(format!("Processed: {}", indicatif::HumanBytes(n))); + } } import = &mut import => { - pb.finish(); + if let Some(pb) = pb.as_ref() { + pb.finish(); + } break import?; } } @@ -266,7 +284,8 @@ where repo, imgref, write_ref, - }) => container_import(&repo, &imgref, write_ref.as_deref()).await, + quiet, + }) => container_import(&repo, &imgref, write_ref.as_deref(), quiet).await, Opt::Container(ContainerOpts::Export { repo, rev, diff --git a/lib/src/cmdext.rs b/lib/src/cmdext.rs new file mode 100644 index 00000000..47a8d1d9 --- /dev/null +++ b/lib/src/cmdext.rs @@ -0,0 +1,19 @@ +use std::os::unix::prelude::{AsRawFd, CommandExt, RawFd}; + +pub(crate) trait CommandRedirectionExt { + fn take_fd_n(&mut self, fd: i32, target: i32) -> &mut Self; +} + +#[allow(unsafe_code)] +impl CommandRedirectionExt for std::process::Command { + fn take_fd_n(&mut self, fd: i32, target: i32) -> &mut Self { + unsafe { + self.pre_exec(move || { + nix::unistd::dup2(fd, target as RawFd) + .map(|_r| ()) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e))) + }); + } + self + } +} diff --git a/lib/src/container/import.rs b/lib/src/container/import.rs index 56efb959..17f70172 100644 --- a/lib/src/container/import.rs +++ b/lib/src/container/import.rs @@ -30,10 +30,7 @@ use super::*; use anyhow::{anyhow, Context}; -use camino::Utf8Path; use fn_error_context::context; -use futures_util::{Future, FutureExt, TryFutureExt}; -use std::io::prelude::*; use std::pin::Pin; use std::process::Stdio; use tokio::io::AsyncRead; @@ -107,158 +104,6 @@ pub async fn fetch_manifest(imgref: &OstreeImageReference) -> Result<(Vec, S Ok((raw_manifest, digest)) } -/// Read the contents of the first .tar we find. -/// The first return value is an `AsyncRead` of that tar file. -/// The second return value is a background worker task that -/// owns stream processing. -pub async fn find_layer_tar( - src: impl AsyncRead + Send + Unpin + 'static, - blobid: &str, -) -> Result<(impl AsyncRead, impl Future>>)> { - // Convert the async input stream to synchronous, becuase we currently use the - // sync tar crate. - let pipein = crate::async_util::async_read_to_sync(src); - // An internal channel of Bytes - let (tx_buf, rx_buf) = tokio::sync::mpsc::channel(2); - let blob_sha256 = blobid - .strip_prefix("sha256:") - .ok_or_else(|| anyhow!("Expected sha256: in digest: {}", blobid))?; - let blob_symlink_target = format!("../{}.tar", blob_sha256); - let worker = tokio::task::spawn_blocking(move || { - let mut pipein = pipein; - let r = - find_layer_tar_sync(&mut pipein, blob_symlink_target, tx_buf).context("Import worker"); - // Ensure we read the entirety of the stream, otherwise skopeo will get an EPIPE. - let _ = std::io::copy(&mut pipein, &mut std::io::sink()); - r - }) - .map_err(anyhow::Error::msg); - // Bridge the channel to an AsyncRead - let stream = tokio_stream::wrappers::ReceiverStream::new(rx_buf); - let reader = tokio_util::io::StreamReader::new(stream); - Ok((reader, worker)) -} - -// Helper function invoked to synchronously parse a `docker-archive:` formatted tar stream, finding -// the desired layer tarball and writing its contents via a stream of byte chunks -// to a channel. -fn find_layer_tar_sync( - pipein: impl Read + Send + Unpin, - blob_symlink_target: String, - tx_buf: tokio::sync::mpsc::Sender>, -) -> Result<()> { - let mut archive = tar::Archive::new(pipein); - let mut buf = vec![0u8; 8192]; - let mut found = false; - for entry in archive.entries()? { - let mut entry = entry.context("Reading entry")?; - if found { - // Continue to read to the end to avoid broken pipe error from skopeo - continue; - } - let path = entry.path()?; - let path: &Utf8Path = path.deref().try_into()?; - // We generally expect our layer to be first, but let's just skip anything - // unexpected to be robust against changes in skopeo. - if path.extension() != Some("tar") { - continue; - } - event!(Level::DEBUG, "Found {}", path); - - match entry.header().entry_type() { - tar::EntryType::Symlink => { - if let Some(name) = path.file_name() { - if name == "layer.tar" { - let target = entry - .link_name()? - .ok_or_else(|| anyhow!("Invalid link {}", path))?; - let target = Utf8Path::from_path(&*target) - .ok_or_else(|| anyhow!("Invalid non-UTF8 path {:?}", target))?; - if target != blob_symlink_target { - return Err(anyhow!( - "Found unexpected layer link {} -> {}", - path, - target - )); - } - } - } - } - tar::EntryType::Regular => loop { - let n = entry - .read(&mut buf[..]) - .context("Reading tar file contents")?; - let done = 0 == n; - let r = Ok::<_, std::io::Error>(bytes::Bytes::copy_from_slice(&buf[0..n])); - let receiver_closed = tx_buf.blocking_send(r).is_err(); - if receiver_closed || done { - found = true; - break; - } - }, - _ => continue, - } - } - if found { - Ok(()) - } else { - Err(anyhow!("Failed to find layer {}", blob_symlink_target)) - } -} - -/// Fetch a remote docker/OCI image and extract a specific uncompressed layer. -async fn fetch_layer<'s>( - imgref: &OstreeImageReference, - blobid: &str, - progress: Option>, -) -> Result<( - impl AsyncRead + Unpin + Send, - impl Future>, -)> { - let mut proc = skopeo::new_cmd(); - proc.stdout(Stdio::null()); - let tempdir = tempfile::Builder::new() - .prefix("ostree-rs-ext") - .tempdir_in("/var/tmp")?; - let tempdir = Utf8Path::from_path(tempdir.path()).unwrap(); - let fifo = &tempdir.join("skopeo.pipe"); - nix::unistd::mkfifo( - fifo.as_os_str(), - nix::sys::stat::Mode::from_bits(0o600).unwrap(), - )?; - tracing::trace!("skopeo pull starting to {}", fifo); - proc.arg("copy") - .arg(imgref.imgref.to_string()) - .arg(format!("docker-archive:{}", fifo)); - let proc = skopeo::spawn(proc)?; - let fifo_reader = ProgressReader { - reader: Box::new(tokio::fs::File::open(fifo).await?), - progress, - }; - let waiter = async move { - let res = proc.wait_with_output().await?; - if !res.status.success() { - return Err(anyhow!( - "skopeo failed: {}\n{}", - res.status, - String::from_utf8_lossy(&res.stderr) - )); - } - Ok(()) - } - .boxed(); - let (contents, worker) = find_layer_tar(fifo_reader, blobid).await?; - // This worker task joins the result of the stream processing thread with monitoring the skopeo process. - let worker = async move { - let (worker, waiter) = tokio::join!(worker, waiter); - // Explicitly declare as `()` to verify we have the right number of `?`. - let _: () = waiter?; - let _: () = worker??; - Ok::<_, anyhow::Error>(()) - }; - Ok((contents, worker)) -} - /// The result of an import operation #[derive(Debug)] pub struct Import { @@ -298,7 +143,8 @@ pub async fn import( imgref: &OstreeImageReference, options: Option, ) -> Result { - let (manifest, image_digest) = fetch_manifest(imgref).await?; + let mut proxy = skopeo::ImageProxy::new(&imgref.imgref).await?; + let (image_digest, manifest) = proxy.fetch_manifest().await?; let ostree_commit = import_from_manifest(repo, imgref, &manifest, options).await?; Ok(Import { ostree_commit, @@ -324,19 +170,23 @@ pub async fn import_from_manifest( let manifest: oci::Manifest = serde_json::from_slice(manifest_bytes)?; let layerid = require_one_layer_blob(&manifest)?; event!(Level::DEBUG, "target blob: {}", layerid); - let (blob, worker) = fetch_layer(imgref, layerid, options.progress).await?; - let blob = tokio::io::BufReader::new(blob); + let mut proxy = skopeo::ImageProxy::new(&imgref.imgref).await?; + let blob = proxy.fetch_blob(layerid).await?; + let blob = async_compression::tokio::bufread::GzipDecoder::new(blob); + let blob = ProgressReader { + reader: Box::new(blob), + progress: options.progress, + }; let mut taropts: crate::tar::TarImportOptions = Default::default(); match &imgref.sigverify { SignatureSource::OstreeRemote(remote) => taropts.remote = Some(remote.clone()), SignatureSource::ContainerPolicy | SignatureSource::ContainerPolicyAllowInsecure => {} } - let import = crate::tar::import_tar(repo, blob, Some(taropts)); - let (ostree_commit, worker) = tokio::join!(import, worker); - // Let any errors from skopeo take precedence, because a failure to parse/find the layer tarball - // is likely due to an underlying error from that. - let _: () = worker?; - let ostree_commit = ostree_commit?; + let ostree_commit = crate::tar::import_tar(repo, blob, Some(taropts)) + .await + .with_context(|| format!("Parsing blob {}", layerid))?; + // FIXME write ostree commit after proxy finalization + proxy.finalize().await?; event!(Level::DEBUG, "created commit {}", ostree_commit); Ok(ostree_commit) } diff --git a/lib/src/container/skopeo.rs b/lib/src/container/skopeo.rs index 476ee9b7..dbd72a9c 100644 --- a/lib/src/container/skopeo.rs +++ b/lib/src/container/skopeo.rs @@ -1,9 +1,18 @@ -//! Fork skopeo as a subprocess +//! Fork skopeo and/or container-image-proxy as a subprocess -use super::Result; +use crate::cmdext::CommandRedirectionExt; + +use super::{ImageReference, Result}; use anyhow::Context; +use futures_util::{Future, FutureExt, TryFutureExt, TryStreamExt}; +use hyper::body::HttpBody; +use hyper::client::conn::{Builder, SendRequest}; +use hyper::{Body, Request, StatusCode}; use serde::Deserialize; +use std::os::unix::prelude::AsRawFd; +use std::pin::Pin; use std::process::Stdio; +use tokio::io::{AsyncBufRead, AsyncReadExt, AsyncWriteExt}; use tokio::process::Command; // See `man containers-policy.json` and @@ -84,6 +93,125 @@ pub(crate) fn spawn(mut cmd: Command) -> Result { cmd.spawn().context("Failed to exec skopeo") } +pub(crate) struct ImageProxy { + proc: tokio::process::Child, + request_sender: SendRequest, + stderr: Pin>>>>, + driver: Pin>>>>, +} + +impl ImageProxy { + pub(crate) async fn new(imgref: &ImageReference) -> Result { + // Communicate over an anonymous socketpair(2) + let (mysock, childsock) = tokio::net::UnixStream::pair()?; + let childsock = childsock.into_std()?; + let mut c = std::process::Command::new("container-image-proxy"); + c.arg(&imgref.to_string()); + c.stdout(Stdio::null()).stderr(Stdio::piped()); + if let Some(port) = std::env::var_os("OSTREE_IMAGE_PROXY_PORT") { + c.arg("--port"); + c.arg(port); + } else { + // Pass one half of the pair as fd 3 to the child + let target_fd = "3"; + c.args(&["--sockfd", target_fd]) + .take_fd_n(childsock.as_raw_fd(), target_fd.parse().unwrap()); + } + let mut c = tokio::process::Command::from(c); + c.kill_on_drop(true); + let mut proc = c.spawn()?; + // We've passed over the fd, close it. + drop(childsock); + + let mut child_stderr = proc.stderr.take().unwrap(); + + // Connect via HTTP to the child + let (request_sender, connection) = Builder::new().handshake::<_, Body>(mysock).await?; + // Background driver that manages things like timeouts. + let driver = tokio::spawn(connection.map_err(anyhow::Error::msg)) + .map_err(anyhow::Error::msg) + .boxed(); + let stderr = tokio::spawn(async move { + let mut buf = String::new(); + child_stderr.read_to_string(&mut buf).await?; + Ok(buf) + }) + .map_err(anyhow::Error::msg) + .boxed(); + Ok(Self { + proc, + stderr, + request_sender, + driver, + }) + } + + pub(crate) async fn fetch_manifest(&mut self) -> Result<(String, Vec)> { + let req = Request::builder() + .header("Host", "localhost") + .method("GET") + .uri("/manifest") + .body(Body::from(""))?; + let mut resp = self.request_sender.send_request(req).await?; + if resp.status() != StatusCode::OK { + return Err(anyhow::anyhow!("error from proxy: {}", resp.status())); + } + let hname = "Manifest-Digest"; + let digest = resp + .headers() + .get(hname) + .ok_or_else(|| anyhow::anyhow!("Missing {} header", hname))? + .to_str() + .with_context(|| format!("Invalid {} header", hname))? + .to_string(); + let mut ret = Vec::new(); + while let Some(chunk) = resp.body_mut().data().await { + let chunk = chunk?; + ret.extend_from_slice(&chunk); + } + Ok((digest, ret)) + } + + pub(crate) async fn fetch_blob( + &mut self, + digest: &str, + ) -> Result { + let uri = format!("/blobs/{}", digest); + let req = Request::builder() + .header("Host", "localhost") + .method("GET") + .uri(&uri) + .body(Body::from(""))?; + let resp = self.request_sender.send_request(req).await?; + let status = resp.status(); + let body = TryStreamExt::map_err(resp.into_body(), |e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + }); + let mut body = tokio_util::io::StreamReader::new(body); + if status != StatusCode::OK { + let mut s = String::new(); + let _: usize = body.read_to_string(&mut s).await?; + return Err(anyhow::anyhow!("error from proxy: {}: {}", status, s)); + } + Ok(body) + } + + pub(crate) async fn finalize(mut self) -> Result<()> { + // For now discard any errors from the connection + drop(self.request_sender); + let _r = self.driver.await??; + let status = self.proc.wait().await?; + if !status.success() { + if let Some(stderr) = self.stderr.await.map(|v| v.ok()).ok().flatten() { + anyhow::bail!("proxy failed: {}\n{}", status, stderr) + } else { + anyhow::bail!("proxy failed: {} (failed to fetch stderr)", status) + } + } + Ok(()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 3137cb65..3b964dc8 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -27,6 +27,8 @@ pub mod container; pub mod diff; pub mod ima; pub mod tar; + +mod cmdext; /// Prelude, intended for glob import. pub mod prelude { #[doc(hidden)]