Skip to content

Commit

Permalink
start, refactor Blob impl for Azure
Browse files Browse the repository at this point in the history
* move fetching each chunk of a Part into a tokio::task
* reduce copying in the case we get an invalid content-length header
* add metrics for tracking the number of responses missing content-length
  • Loading branch information
ParkMyCar committed Jan 21, 2025
1 parent 925f577 commit 8cc2778
Showing 1 changed file with 79 additions and 42 deletions.
121 changes: 79 additions & 42 deletions src/persist/src/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
use std::fmt::Debug;
use std::sync::Arc;

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use azure_core::StatusCode;
use azure_identity::create_default_credential;
use azure_storage::{prelude::*, CloudLocation, EMULATOR_ACCOUNT};
use azure_storage_blobs::blob::operations::GetBlobResponse;
use azure_storage_blobs::prelude::*;
use bytes::Bytes;
use futures_util::StreamExt;
Expand Down Expand Up @@ -185,29 +186,12 @@ impl Blob for AzureBlob {
async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
let path = self.get_path(key);
let blob = self.client.blob_client(path);
let mut segments: Vec<MaybeLgBytes> = vec![];

// TODO: the default chunk size is 1MB. We have not tried tuning it,
// but making this configurable / running some benchmarks could be
// valuable.
let mut stream = blob.get().into_stream();
while let Some(value) = stream.next().await {
let response = match value {
Ok(v) => v,
Err(e) => {
if let Some(e) = e.as_http_error() {
if e.status() == StatusCode::NotFound {
return Ok(None);
}
}

return Err(ExternalError::from(anyhow!(
"Azure blob get error: {:?}",
e
)));
}
};

/// Fetch a the body of a single [`GetBlobResponse`].
async fn fetch_chunk(
response: GetBlobResponse,
metrics: S3BlobMetrics,
) -> Result<MaybeLgBytes, ExternalError> {
let content_length = response.blob.properties.content_length;

// Here we're being quite defensive. If `content_length` comes back
Expand All @@ -216,35 +200,97 @@ impl Blob for AzureBlob {
// buffer into lgalloc.
let mut buffer = match content_length {
1.. => {
let region = self
.metrics
let region = metrics
.lgbytes
.persist_azure
.new_region(usize::cast_from(content_length));
PreSizedBuffer::Sized(region)
}
0 => PreSizedBuffer::Unknown(Vec::new()),
0 => {
metrics.get_invalid_resp.inc();
PreSizedBuffer::Unknown(SegmentedBytes::new())
}
};

let mut body = response.data;
while let Some(value) = body.next().await {
let value = value.map_err(|e| {
ExternalError::from(anyhow!("Azure blob get body error: {}", e))
})?;
buffer.extend_from_slice(&value);

match &mut buffer {
PreSizedBuffer::Sized(region) => region.extend_from_slice(&value),
PreSizedBuffer::Unknown(segments) => segments.push(value),
}
}

// Spill our bytes to lgalloc, if they aren't already.
let lg_bytes = match buffer {
let lgbytes = match buffer {
PreSizedBuffer::Sized(region) => LgBytes::from(Arc::new(region)),
PreSizedBuffer::Unknown(buffer) => {
self.metrics.lgbytes.persist_azure.try_mmap(buffer)
// Now that we've collected all of the segments, we know the size of our region.
PreSizedBuffer::Unknown(segments) => {
let mut region = metrics.lgbytes.persist_azure.new_region(segments.len());
for segment in segments.into_segments() {
region.extend_from_slice(segment.as_ref());
}
LgBytes::from(Arc::new(region))
}
};

Ok(MaybeLgBytes::LgBytes(lgbytes))
}

let mut handles = Vec::new();
// TODO: the default chunk size is 1MB. We have not tried tuning it,
// but making this configurable / running some benchmarks could be
// valuable.
let mut stream = blob.get().into_stream();

while let Some(value) = stream.next().await {
// Return early if any of the individual fetch requests return an error.
let response = match value {
Ok(v) => v,
Err(e) => {
if let Some(e) = e.as_http_error() {
if e.status() == StatusCode::NotFound {
return Ok(None);
}
}

return Err(ExternalError::from(anyhow!(
"Azure blob get error: {:?}",
e
)));
}
};
segments.push(MaybeLgBytes::LgBytes(lg_bytes));

// Fetch each chunk in a separate task for maximum concurrency.
//
// Note: Using `abort_on_drop` is important so we cancel any
// pending tasks if one hits an error.
let metrics = self.metrics.clone();
let handle = mz_ore::task::spawn(|| "azure-fetch-chunk", async move {
fetch_chunk(response, metrics).await
})
.abort_on_drop();

handles.push(handle);
}

// Await on all of our chunks.
//
// No concurrency is needed here because the individual fetches are
// driven in tokio::tasks.
let mut segments = SegmentedBytes::with_capacity(handles.len());
for handle in handles {
let segment = handle
.await
.context("task spawn failure")?
.context("azure get body err")?;
segments.push(segment);
}

Ok(Some(SegmentedBytes::from(segments)))
Ok(Some(segments))
}

async fn list_keys_and_metadata(
Expand Down Expand Up @@ -343,16 +389,7 @@ impl Blob for AzureBlob {
/// that as we read bytes off the network.
enum PreSizedBuffer {
Sized(MetricsRegion<u8>),
Unknown(Vec<u8>),
}

impl PreSizedBuffer {
fn extend_from_slice(&mut self, slice: &[u8]) {
match self {
PreSizedBuffer::Sized(region) => region.extend_from_slice(slice),
PreSizedBuffer::Unknown(buffer) => buffer.extend_from_slice(slice),
}
}
Unknown(SegmentedBytes),
}

#[cfg(test)]
Expand Down

0 comments on commit 8cc2778

Please sign in to comment.