Skip to content

Commit

Permalink
Reduce memory usage (#148)
Browse files Browse the repository at this point in the history
Client memory footprint on file upload has been high due to several
locations of inefficiency. This fixes some of those:
- Reduce disk read queue size.
- Limit upload concurrency to hardware parallelism (# of cores).
- Drop large buffer preemptively before "slow" network transmission.

Context:
Before the above changes the theoretical memory usage is:


![405850025-bdfc5fe0-f891-4fba-9b39-44be6aa3b6f6](https://github.com/user-attachments/assets/5f0b05f7-196d-4353-a474-ef251a56af76)


This PR reduces the memory usage to the below without compromising speed
(benchmarked with same settings as in
https://www.notion.so/huggingface2/Xorb-upload-speed-1531384ebcac8012bf74fa08ed822f67):



![405850334-3de3648b-b30d-41a9-a0f7-9088ee6f7644](https://github.com/user-attachments/assets/c5e2b5bf-18c2-4d5c-a0f6-c9bf89e77b92)


There is still ~650 MiB extra memory footprint demanding further
investigation. The 512 MiB global Xorb uploader memory footprint can
also be significantly reduced, but that requires more work in the
future.
  • Loading branch information
seanses authored Jan 23, 2025
1 parent d5ced3d commit f13a722
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
8 changes: 5 additions & 3 deletions cas_client/src/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl UploadClient for RemoteClient {
hash: *hash,
};

let was_uploaded = self.upload(&key, &data, chunk_and_boundaries).await?;
let was_uploaded = self.upload(&key, data, chunk_and_boundaries).await?;

if !was_uploaded {
debug!("{key:?} not inserted into CAS.");
Expand Down Expand Up @@ -243,7 +243,7 @@ impl RemoteClient {
pub async fn upload(
&self,
key: &Key,
contents: &[u8],
contents: Vec<u8>,
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
) -> Result<bool> {
let url = Url::parse(&format!("{}/xorb/{key}", self.endpoint))?;
Expand All @@ -253,10 +253,12 @@ impl RemoteClient {
let (_, _) = CasObject::serialize(
&mut writer,
&key.hash,
contents,
&contents,
&chunk_and_boundaries,
cas_object::CompressionScheme::LZ4,
)?;
// free memory before the "slow" network transfer below
drop(contents);

debug!("Upload: POST to {url:?} for {key:?}");
writer.set_position(0);
Expand Down
4 changes: 2 additions & 2 deletions data/src/clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use mdb_shard::file_structs::{
};
use mdb_shard::{hash_is_global_dedup_eligible, ShardFileManager};
use merkledb::aggregate_hashes::file_node_hash;
use merkledb::constants::TARGET_CAS_BLOCK_SIZE;
use merkledb::constants::{IDEAL_CAS_BLOCK_SIZE, TARGET_CAS_BLOCK_SIZE, TARGET_CDC_CHUNK_SIZE};
use merklehash::MerkleHash;
use sha2::{Digest, Sha256};
use tokio::sync::mpsc::error::TryRecvError;
Expand Down Expand Up @@ -161,7 +161,7 @@ impl Cleaner {
) -> Result<Arc<Self>> {
let (data_p, data_c) = channel::<BufferItem<Vec<u8>>>(buffer_size);

let (chunk_p, chunk_c) = channel::<Option<ChunkYieldType>>(buffer_size);
let (chunk_p, chunk_c) = channel::<Option<ChunkYieldType>>(IDEAL_CAS_BLOCK_SIZE / TARGET_CDC_CHUNK_SIZE); // enough to fill one CAS block

let chunker = chunk_target_default(data_c, chunk_p, threadpool.clone());

Expand Down
20 changes: 16 additions & 4 deletions data/src/data_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ use std::env::current_dir;
use std::fs;
use std::fs::File;
use std::io::{BufReader, Read, Write};
use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;

use cas_client::CacheConfig;
use dirs::home_dir;
use lazy_static::lazy_static;
use merkledb::constants::IDEAL_CAS_BLOCK_SIZE;
use parutils::{tokio_par_for_each, ParallelError};
use tempfile::{tempdir_in, TempDir};
use utils::auth::{AuthConfig, TokenRefresher};
Expand All @@ -18,8 +21,12 @@ use crate::errors::DataProcessingError;
use crate::{errors, PointerFile, PointerFileTranslator};

// Concurrency in number of files
const MAX_CONCURRENT_UPLOADS: usize = 8; // TODO
const MAX_CONCURRENT_DOWNLOADS: usize = 8; // TODO
lazy_static! {
// Upload may be CPU-bound, this depends on network bandwidth and CPU speed
static ref MAX_CONCURRENT_UPLOADS: usize =
std::thread::available_parallelism().unwrap_or(NonZero::new(8).unwrap()).get();
}
const MAX_CONCURRENT_DOWNLOADS: usize = 8; // Download is not CPU-bound

// We now process every file delegated from the Python library.
const SMALL_FILE_THRESHOLD: usize = 1;
Expand Down Expand Up @@ -101,7 +108,7 @@ pub async fn upload_async(
let processor = Arc::new(PointerFileTranslator::new(config, threadpool, progress_updater, false).await?);

// for all files, clean them, producing pointer files.
let pointers = tokio_par_for_each(file_paths, MAX_CONCURRENT_UPLOADS, |f, _| async {
let pointers = tokio_par_for_each(file_paths, *MAX_CONCURRENT_UPLOADS, |f, _| async {
let proc = processor.clone();
clean_file(&proc, f).await
})
Expand Down Expand Up @@ -160,7 +167,12 @@ async fn clean_file(processor: &PointerFileTranslator, f: String) -> errors::Res
let mut read_buf = vec![0u8; READ_BLOCK_SIZE];
let path = PathBuf::from(f);
let mut reader = BufReader::new(File::open(path.clone())?);
let handle = processor.start_clean(1024, None).await?;
let handle = processor
.start_clean(
IDEAL_CAS_BLOCK_SIZE / READ_BLOCK_SIZE, // enough to fill one CAS block
Some(&path), // for logging & telemetry
)
.await?;

loop {
let bytes = reader.read(&mut read_buf)?;
Expand Down

0 comments on commit f13a722

Please sign in to comment.