Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(oci/client): update unpack_archive_layer to take cache; make pub #2523

Merged
merged 1 commit into from
May 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 41 additions & 39 deletions crates/oci/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ impl Client {
this.cache.write_wasm(&bytes, &layer.digest).await?;
}
ARCHIVE_MEDIATYPE => {
this.unpack_archive_layer(&bytes, &layer.digest).await?;
unpack_archive_layer(&this.cache, &bytes, &layer.digest).await?;
}
_ => {
this.cache.write_data(&bytes, &layer.digest).await?;
Expand Down Expand Up @@ -515,44 +515,6 @@ impl Client {
}
}

/// Unpack archive layer into self.cache
async fn unpack_archive_layer(
&self,
bytes: impl AsRef<[u8]>,
digest: impl AsRef<str>,
) -> Result<()> {
// Write archive layer to cache as usual
self.cache.write_data(&bytes, &digest).await?;

// Unpack archive into a staging dir
let path = self
.cache
.data_file(&digest)
.context("unable to read archive layer from cache")?;
let staging_dir = tempfile::tempdir()?;
crate::utils::unarchive(path.as_ref(), staging_dir.path()).await?;

// Traverse unpacked contents and if a file, write to cache by digest
// (if it doesn't already exist)
for entry in WalkDir::new(staging_dir.path()) {
let entry = entry?;
if entry.file_type().is_file() && !entry.file_type().is_dir() {
let bytes = tokio::fs::read(entry.path()).await?;
let digest = format!("sha256:{}", sha256::hex_digest_from_bytes(&bytes));
if self.cache.data_file(&digest).is_ok() {
tracing::debug!(
"Skipping unpacked asset {:?}; file already exists",
entry.path()
);
} else {
tracing::debug!("Adding unpacked asset {:?} to cache", entry.path());
self.cache.write_data(bytes, &digest).await?;
}
}
}
Ok(())
}

/// Save a credential set containing the registry username and password.
pub async fn login(
server: impl AsRef<str>,
Expand Down Expand Up @@ -655,6 +617,46 @@ impl Client {
}
}

/// Unpack contents of the provided archive layer, represented by bytes and its
/// corresponding digest, into the provided cache.
/// A temporary staging directory is created via tempfile::tempdir() to store
/// the unpacked contents prior to writing to the cache.
pub async fn unpack_archive_layer(
cache: &Cache,
bytes: impl AsRef<[u8]>,
digest: impl AsRef<str>,
) -> Result<()> {
// Write archive layer to cache as usual
cache.write_data(&bytes, &digest).await?;

// Unpack archive into a staging dir
let path = cache
.data_file(&digest)
.context("unable to read archive layer from cache")?;
let staging_dir = tempfile::tempdir()?;
crate::utils::unarchive(path.as_ref(), staging_dir.path()).await?;

// Traverse unpacked contents and if a file, write to cache by digest
// (if it doesn't already exist)
for entry in WalkDir::new(staging_dir.path()) {
let entry = entry?;
if entry.file_type().is_file() && !entry.file_type().is_dir() {
let bytes = tokio::fs::read(entry.path()).await?;
let digest = format!("sha256:{}", sha256::hex_digest_from_bytes(&bytes));
if cache.data_file(&digest).is_ok() {
tracing::debug!(
"Skipping unpacked asset {:?}; file already exists",
entry.path()
);
} else {
tracing::debug!("Adding unpacked asset {:?} to cache", entry.path());
cache.write_data(bytes, &digest).await?;
}
}
}
Ok(())
}

fn digest_from_url(manifest_url: &str) -> Option<String> {
// The URL is in the form "https://host/v2/refname/manifests/sha256:..."
let manifest_url = Url::parse(manifest_url).ok()?;
Expand Down
Loading