Skip to content
This repository has been archived by the owner on Jan 15, 2025. It is now read-only.

Commit

Permalink
WIP: Fetch via container-image-proxy
Browse files Browse the repository at this point in the history
https://github.com/cgwalters/container-image-proxy
is prototype code to expose containers/image via a HTTP API
suitable for use in non-Go programs.

This has many advantages over us forking skopeo; the key one being
on-demand layer fetching.
  • Loading branch information
cgwalters committed Sep 23, 2021
1 parent c75691e commit 9e47e34
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 179 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ jobs:
container: quay.io/coreos-assembler/fcos-buildroot:testing-devel

steps:
- name: Install skopeo
run: yum -y install skopeo
- name: Update ostree
run: yum -y --enablerepo=updates-testing update ostree-devel
- uses: actions/checkout@v2
- name: Install deps
run: ./ci/installdeps.sh
- name: Format
run: cargo fmt -- --check -l
- name: Build
Expand Down
10 changes: 10 additions & 0 deletions ci/installdeps.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash
set -xeuo pipefail

yum -y install skopeo
yum -y --enablerepo=updates-testing update ostree-devel

git clone --depth=1 https://github.com/cgwalters/container-image-proxy
cd container-image-proxy
make
install -m 0755 bin/container-image-proxy /usr/bin/
2 changes: 2 additions & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ version = "0.4.0-alpha.0"

[dependencies]
anyhow = "1.0"
async-compression = { version = "0.3", features = ["gzip", "tokio"] }
bytes = "1.0.1"
bitflags = "1.3.2"
camino = "1.0.4"
Expand All @@ -19,6 +20,7 @@ fn-error-context = "0.2.0"
futures-util = "0.3.13"
gvariant = "0.4.0"
hex = "0.4.3"
hyper = { version = "0.14", features = ["full"] }
indicatif = "0.16.0"
lazy_static = "1.4.0"
libc = "0.2.92"
Expand Down
37 changes: 28 additions & 9 deletions lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ enum ContainerOpts {
/// Create an ostree ref pointing to the imported commit
#[structopt(long)]
write_ref: Option<String>,

/// Don't display progress
#[structopt(long)]
quiet: bool,
},

/// Print information about an exported ostree-container image.
Expand Down Expand Up @@ -155,17 +159,27 @@ fn tar_export(opts: &ExportOpts) -> Result<()> {
}

/// Import a container image with an encapsulated ostree commit.
async fn container_import(repo: &str, imgref: &str, write_ref: Option<&str>) -> Result<()> {
async fn container_import(
repo: &str,
imgref: &str,
write_ref: Option<&str>,
quiet: bool,
) -> Result<()> {
let repo = &ostree::Repo::open_at(libc::AT_FDCWD, repo, gio::NONE_CANCELLABLE)?;
let imgref = imgref.try_into()?;
let (tx_progress, rx_progress) = tokio::sync::watch::channel(Default::default());
let target = indicatif::ProgressDrawTarget::stdout();
let style = indicatif::ProgressStyle::default_bar();
let pb = indicatif::ProgressBar::new_spinner();
pb.set_draw_target(target);
pb.set_style(style.template("{spinner} {prefix} {msg}"));
pb.enable_steady_tick(200);
pb.set_message("Downloading...");
let pb = if !quiet {
let pb = indicatif::ProgressBar::new_spinner();
pb.set_draw_target(target);
pb.set_style(style.template("{spinner} {prefix} {msg}"));
pb.enable_steady_tick(200);
pb.set_message("Downloading...");
Some(pb)
} else {
None
};
let opts = ImportOptions {
progress: Some(tx_progress),
};
Expand All @@ -176,10 +190,14 @@ async fn container_import(repo: &str, imgref: &str, write_ref: Option<&str>) ->
tokio::select! {
_ = rx_progress.changed() => {
let n = rx_progress.borrow().processed_bytes;
pb.set_message(format!("Processed: {}", indicatif::HumanBytes(n)));
if let Some(pb) = pb.as_ref() {
pb.set_message(format!("Processed: {}", indicatif::HumanBytes(n)));
}
}
import = &mut import => {
pb.finish();
if let Some(pb) = pb.as_ref() {
pb.finish();
}
break import?;
}
}
Expand Down Expand Up @@ -266,7 +284,8 @@ where
repo,
imgref,
write_ref,
}) => container_import(&repo, &imgref, write_ref.as_deref()).await,
quiet,
}) => container_import(&repo, &imgref, write_ref.as_deref(), quiet).await,
Opt::Container(ContainerOpts::Export {
repo,
rev,
Expand Down
19 changes: 19 additions & 0 deletions lib/src/cmdext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::os::unix::prelude::{AsRawFd, CommandExt, RawFd};

pub(crate) trait CommandRedirectionExt {
fn take_fd_n(&mut self, fd: i32, target: i32) -> &mut Self;
}

#[allow(unsafe_code)]
impl CommandRedirectionExt for std::process::Command {
fn take_fd_n(&mut self, fd: i32, target: i32) -> &mut Self {
unsafe {
self.pre_exec(move || {
nix::unistd::dup2(fd, target as RawFd)
.map(|_r| ())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))
});
}
self
}
}
178 changes: 14 additions & 164 deletions lib/src/container/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@

use super::*;
use anyhow::{anyhow, Context};
use camino::Utf8Path;
use fn_error_context::context;
use futures_util::{Future, FutureExt, TryFutureExt};
use std::io::prelude::*;
use std::pin::Pin;
use std::process::Stdio;
use tokio::io::AsyncRead;
Expand Down Expand Up @@ -107,158 +104,6 @@ pub async fn fetch_manifest(imgref: &OstreeImageReference) -> Result<(Vec<u8>, S
Ok((raw_manifest, digest))
}

/// Read the contents of the first <checksum>.tar we find.
/// The first return value is an `AsyncRead` of that tar file.
/// The second return value is a background worker task that
/// owns stream processing.
pub async fn find_layer_tar(
src: impl AsyncRead + Send + Unpin + 'static,
blobid: &str,
) -> Result<(impl AsyncRead, impl Future<Output = Result<Result<()>>>)> {
// Convert the async input stream to synchronous, becuase we currently use the
// sync tar crate.
let pipein = crate::async_util::async_read_to_sync(src);
// An internal channel of Bytes
let (tx_buf, rx_buf) = tokio::sync::mpsc::channel(2);
let blob_sha256 = blobid
.strip_prefix("sha256:")
.ok_or_else(|| anyhow!("Expected sha256: in digest: {}", blobid))?;
let blob_symlink_target = format!("../{}.tar", blob_sha256);
let worker = tokio::task::spawn_blocking(move || {
let mut pipein = pipein;
let r =
find_layer_tar_sync(&mut pipein, blob_symlink_target, tx_buf).context("Import worker");
// Ensure we read the entirety of the stream, otherwise skopeo will get an EPIPE.
let _ = std::io::copy(&mut pipein, &mut std::io::sink());
r
})
.map_err(anyhow::Error::msg);
// Bridge the channel to an AsyncRead
let stream = tokio_stream::wrappers::ReceiverStream::new(rx_buf);
let reader = tokio_util::io::StreamReader::new(stream);
Ok((reader, worker))
}

// Helper function invoked to synchronously parse a `docker-archive:` formatted tar stream, finding
// the desired layer tarball and writing its contents via a stream of byte chunks
// to a channel.
fn find_layer_tar_sync(
pipein: impl Read + Send + Unpin,
blob_symlink_target: String,
tx_buf: tokio::sync::mpsc::Sender<std::io::Result<bytes::Bytes>>,
) -> Result<()> {
let mut archive = tar::Archive::new(pipein);
let mut buf = vec![0u8; 8192];
let mut found = false;
for entry in archive.entries()? {
let mut entry = entry.context("Reading entry")?;
if found {
// Continue to read to the end to avoid broken pipe error from skopeo
continue;
}
let path = entry.path()?;
let path: &Utf8Path = path.deref().try_into()?;
// We generally expect our layer to be first, but let's just skip anything
// unexpected to be robust against changes in skopeo.
if path.extension() != Some("tar") {
continue;
}
event!(Level::DEBUG, "Found {}", path);

match entry.header().entry_type() {
tar::EntryType::Symlink => {
if let Some(name) = path.file_name() {
if name == "layer.tar" {
let target = entry
.link_name()?
.ok_or_else(|| anyhow!("Invalid link {}", path))?;
let target = Utf8Path::from_path(&*target)
.ok_or_else(|| anyhow!("Invalid non-UTF8 path {:?}", target))?;
if target != blob_symlink_target {
return Err(anyhow!(
"Found unexpected layer link {} -> {}",
path,
target
));
}
}
}
}
tar::EntryType::Regular => loop {
let n = entry
.read(&mut buf[..])
.context("Reading tar file contents")?;
let done = 0 == n;
let r = Ok::<_, std::io::Error>(bytes::Bytes::copy_from_slice(&buf[0..n]));
let receiver_closed = tx_buf.blocking_send(r).is_err();
if receiver_closed || done {
found = true;
break;
}
},
_ => continue,
}
}
if found {
Ok(())
} else {
Err(anyhow!("Failed to find layer {}", blob_symlink_target))
}
}

/// Fetch a remote docker/OCI image and extract a specific uncompressed layer.
async fn fetch_layer<'s>(
imgref: &OstreeImageReference,
blobid: &str,
progress: Option<tokio::sync::watch::Sender<ImportProgress>>,
) -> Result<(
impl AsyncRead + Unpin + Send,
impl Future<Output = Result<()>>,
)> {
let mut proc = skopeo::new_cmd();
proc.stdout(Stdio::null());
let tempdir = tempfile::Builder::new()
.prefix("ostree-rs-ext")
.tempdir_in("/var/tmp")?;
let tempdir = Utf8Path::from_path(tempdir.path()).unwrap();
let fifo = &tempdir.join("skopeo.pipe");
nix::unistd::mkfifo(
fifo.as_os_str(),
nix::sys::stat::Mode::from_bits(0o600).unwrap(),
)?;
tracing::trace!("skopeo pull starting to {}", fifo);
proc.arg("copy")
.arg(imgref.imgref.to_string())
.arg(format!("docker-archive:{}", fifo));
let proc = skopeo::spawn(proc)?;
let fifo_reader = ProgressReader {
reader: Box::new(tokio::fs::File::open(fifo).await?),
progress,
};
let waiter = async move {
let res = proc.wait_with_output().await?;
if !res.status.success() {
return Err(anyhow!(
"skopeo failed: {}\n{}",
res.status,
String::from_utf8_lossy(&res.stderr)
));
}
Ok(())
}
.boxed();
let (contents, worker) = find_layer_tar(fifo_reader, blobid).await?;
// This worker task joins the result of the stream processing thread with monitoring the skopeo process.
let worker = async move {
let (worker, waiter) = tokio::join!(worker, waiter);
// Explicitly declare as `()` to verify we have the right number of `?`.
let _: () = waiter?;
let _: () = worker??;
Ok::<_, anyhow::Error>(())
};
Ok((contents, worker))
}

/// The result of an import operation
#[derive(Debug)]
pub struct Import {
Expand Down Expand Up @@ -298,7 +143,8 @@ pub async fn import(
imgref: &OstreeImageReference,
options: Option<ImportOptions>,
) -> Result<Import> {
let (manifest, image_digest) = fetch_manifest(imgref).await?;
let mut proxy = skopeo::ImageProxy::new(&imgref.imgref).await?;
let (image_digest, manifest) = proxy.fetch_manifest().await?;
let ostree_commit = import_from_manifest(repo, imgref, &manifest, options).await?;
Ok(Import {
ostree_commit,
Expand All @@ -324,19 +170,23 @@ pub async fn import_from_manifest(
let manifest: oci::Manifest = serde_json::from_slice(manifest_bytes)?;
let layerid = require_one_layer_blob(&manifest)?;
event!(Level::DEBUG, "target blob: {}", layerid);
let (blob, worker) = fetch_layer(imgref, layerid, options.progress).await?;
let blob = tokio::io::BufReader::new(blob);
let mut proxy = skopeo::ImageProxy::new(&imgref.imgref).await?;
let blob = proxy.fetch_blob(layerid).await?;
let blob = async_compression::tokio::bufread::GzipDecoder::new(blob);
let blob = ProgressReader {
reader: Box::new(blob),
progress: options.progress,
};
let mut taropts: crate::tar::TarImportOptions = Default::default();
match &imgref.sigverify {
SignatureSource::OstreeRemote(remote) => taropts.remote = Some(remote.clone()),
SignatureSource::ContainerPolicy | SignatureSource::ContainerPolicyAllowInsecure => {}
}
let import = crate::tar::import_tar(repo, blob, Some(taropts));
let (ostree_commit, worker) = tokio::join!(import, worker);
// Let any errors from skopeo take precedence, because a failure to parse/find the layer tarball
// is likely due to an underlying error from that.
let _: () = worker?;
let ostree_commit = ostree_commit?;
let ostree_commit = crate::tar::import_tar(repo, blob, Some(taropts))
.await
.with_context(|| format!("Parsing blob {}", layerid))?;
// FIXME write ostree commit after proxy finalization
proxy.finalize().await?;
event!(Level::DEBUG, "created commit {}", ostree_commit);
Ok(ostree_commit)
}
Loading

0 comments on commit 9e47e34

Please sign in to comment.