Skip to content

Commit 2ba4e70

Browse files
committed
update chunking size to 10mb and re-use client on upload
1 parent cf332e3 commit 2ba4e70

File tree

4 files changed

+131
-23
lines changed

4 files changed

+131
-23
lines changed

src/lib/src/api/client.rs

+6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
44
use crate::config::AuthConfig;
55
use crate::error::OxenError;
6+
use crate::model::RemoteRepository;
67
use crate::view::http;
78
use crate::view::OxenResponse;
89

@@ -57,6 +58,11 @@ fn new_for_host<S: AsRef<str>>(host: S, should_add_user_agent: bool) -> Result<C
5758
}
5859
}
5960

61+
pub fn builder_for_remote_repo(remote_repo: &RemoteRepository) -> Result<ClientBuilder, OxenError> {
62+
let host = get_host_from_url(remote_repo.url())?;
63+
builder_for_host(host, true)
64+
}
65+
6066
pub fn builder_for_url<U: IntoUrl>(url: U) -> Result<ClientBuilder, OxenError> {
6167
let host = get_host_from_url(url)?;
6268
builder_for_host(host, true)

src/lib/src/api/client/commits.rs

+108-16
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use bytesize::ByteSize;
2929
use flate2::write::GzEncoder;
3030
use flate2::Compression;
3131
use futures_util::TryStreamExt;
32+
use http::header::CONTENT_LENGTH;
3233
use indicatif::{ProgressBar, ProgressStyle};
3334

3435
pub struct ChunkParams {
@@ -954,6 +955,104 @@ pub async fn upload_data_chunk_to_server_with_retry(
954955
)))
955956
}
956957

958+
pub async fn upload_data_chunk_to_server_with_client_with_retry(
959+
client: &reqwest::Client,
960+
remote_repo: &RemoteRepository,
961+
chunk: &[u8],
962+
hash: &str,
963+
params: &ChunkParams,
964+
is_compressed: bool,
965+
filename: &Option<String>,
966+
) -> Result<(), OxenError> {
967+
let mut total_tries = 0;
968+
let mut last_error = String::from("");
969+
while total_tries < constants::NUM_HTTP_RETRIES {
970+
match upload_data_chunk_to_server_with_client(
971+
client,
972+
remote_repo,
973+
chunk,
974+
hash,
975+
params,
976+
is_compressed,
977+
filename,
978+
)
979+
.await
980+
{
981+
Ok(_) => {
982+
return Ok(());
983+
}
984+
Err(err) => {
985+
total_tries += 1;
986+
// Exponentially back off
987+
let sleep_time = total_tries * total_tries;
988+
log::debug!(
989+
"upload_data_chunk_to_server_with_client_with_retry upload failed sleeping {}: {}",
990+
sleep_time,
991+
err
992+
);
993+
last_error = format!("{}", err);
994+
std::thread::sleep(std::time::Duration::from_secs(sleep_time));
995+
}
996+
}
997+
}
998+
999+
Err(OxenError::basic_str(format!(
1000+
"Upload chunk retry failed. {}",
1001+
last_error
1002+
)))
1003+
}
1004+
1005+
async fn upload_data_chunk_to_server_with_client(
1006+
client: &reqwest::Client,
1007+
remote_repo: &RemoteRepository,
1008+
chunk: &[u8],
1009+
hash: &str,
1010+
params: &ChunkParams,
1011+
is_compressed: bool,
1012+
filename: &Option<String>,
1013+
) -> Result<StatusMessage, OxenError> {
1014+
let maybe_filename = if !is_compressed {
1015+
format!(
1016+
"&filename={}",
1017+
urlencoding::encode(
1018+
filename
1019+
.as_ref()
1020+
.expect("Must provide filename if !compressed")
1021+
)
1022+
)
1023+
} else {
1024+
String::from("")
1025+
};
1026+
1027+
let uri = format!(
1028+
"/commits/upload_chunk?chunk_num={}&total_size={}&hash={}&total_chunks={}&is_compressed={}{}",
1029+
params.chunk_num, params.total_size, hash, params.total_chunks, is_compressed, maybe_filename);
1030+
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
1031+
let total_size = chunk.len() as u64;
1032+
log::debug!(
1033+
"upload_data_chunk_to_server posting {} to url {}",
1034+
ByteSize::b(total_size),
1035+
url
1036+
);
1037+
1038+
let res = client
1039+
.post(&url)
1040+
.header(CONTENT_LENGTH, total_size.to_string())
1041+
.body(chunk.to_owned())
1042+
.send()
1043+
.await?;
1044+
let body = client::parse_json_body(&url, res).await?;
1045+
1046+
log::debug!("upload_data_chunk_to_server got response {}", body);
1047+
let response: Result<StatusMessage, serde_json::Error> = serde_json::from_str(&body);
1048+
match response {
1049+
Ok(response) => Ok(response),
1050+
Err(err) => Err(OxenError::basic_str(format!(
1051+
"upload_data_chunk_to_server Err deserializing: {err}"
1052+
))),
1053+
}
1054+
}
1055+
9571056
async fn upload_data_chunk_to_server(
9581057
remote_repo: &RemoteRepository,
9591058
chunk: &[u8],
@@ -990,23 +1089,16 @@ async fn upload_data_chunk_to_server(
9901089
.timeout(time::Duration::from_secs(120))
9911090
.build()?;
9921091

993-
match client.post(&url).body(chunk.to_owned()).send().await {
994-
Ok(res) => {
995-
let body = client::parse_json_body(&url, res).await?;
1092+
let res = client.post(&url).body(chunk.to_owned()).send().await?;
1093+
let body = client::parse_json_body(&url, res).await?;
9961094

997-
log::debug!("upload_data_chunk_to_server got response {}", body);
998-
let response: Result<StatusMessage, serde_json::Error> = serde_json::from_str(&body);
999-
match response {
1000-
Ok(response) => Ok(response),
1001-
Err(err) => Err(OxenError::basic_str(format!(
1002-
"upload_data_chunk_to_server Err deserializing: {err}"
1003-
))),
1004-
}
1005-
}
1006-
Err(e) => {
1007-
let err_str = format!("Err upload_data_chunk_to_server: {e:?}");
1008-
Err(OxenError::basic_str(err_str))
1009-
}
1095+
log::debug!("upload_data_chunk_to_server got response {}", body);
1096+
let response: Result<StatusMessage, serde_json::Error> = serde_json::from_str(&body);
1097+
match response {
1098+
Ok(response) => Ok(response),
1099+
Err(err) => Err(OxenError::basic_str(format!(
1100+
"upload_data_chunk_to_server Err deserializing: {err}"
1101+
))),
10101102
}
10111103
}
10121104

src/lib/src/constants.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,10 @@ pub const EVAL_ERROR_COL: &str = "_oxen_eval_error";
173173
pub const EVAL_DURATION_COL: &str = "_oxen_eval_duration";
174174

175175
// Data transfer
176-
// Average chunk size of ~4mb
177-
/// Average chunk size of ~4mb when chunking and sending data
176+
// Average chunk size of ~10mb
177+
/// Average chunk size of ~10mb when chunking and sending data
178178
// pub const AVG_CHUNK_SIZE: u64 = 1024 * 1024 * 4;
179-
pub const AVG_CHUNK_SIZE: u64 = 1024 * 1024 * 4;
179+
pub const AVG_CHUNK_SIZE: u64 = 1024 * 1024 * 10;
180180
// Retry and back off of requests N times
181181
/// Retry and back off of requests N times
182182
#[cfg(test)]

src/lib/src/core/v_latest/push.rs

+14-4
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use std::sync::Arc;
99
use tokio::time::Duration;
1010

1111
use crate::api::client::commits::ChunkParams;
12+
use crate::constants::AVG_CHUNK_SIZE;
1213
use crate::constants::DEFAULT_REMOTE_NAME;
13-
use crate::constants::{self, AVG_CHUNK_SIZE};
1414
use crate::core::progress::push_progress::PushProgress;
1515
use crate::error::OxenError;
1616
use crate::model::entry::commit_entry::Entry;
@@ -446,13 +446,20 @@ async fn upload_large_file_chunks(
446446
let mut total_bytes_read = 0;
447447
let mut chunk_size = chunk_size;
448448

449+
// Create a client for uploading chunks
450+
let client = api::client::builder_for_remote_repo(&remote_repo)
451+
.unwrap()
452+
.build()
453+
.unwrap();
454+
449455
// Create queues for sending data to workers
450456
type PieceOfWork = (
451457
Vec<u8>,
452458
u64, // chunk size
453459
usize, // chunk num
454460
usize, // total chunks
455461
u64, // total size
462+
reqwest::Client,
456463
RemoteRepository,
457464
String, // entry hash
458465
Commit,
@@ -462,13 +469,13 @@ async fn upload_large_file_chunks(
462469
// In order to upload chunks in parallel
463470
// We should only read N chunks at a time so that
464471
// the whole file does not get read into memory
465-
let sub_chunk_size = constants::DEFAULT_NUM_WORKERS;
472+
let sub_chunk_size = concurrency::num_threads_for_items(total_chunks);
466473

467474
let mut total_chunk_idx = 0;
468475
let mut processed_chunk_idx = 0;
469476
let num_sub_chunks = (total_chunks / sub_chunk_size) + 1;
470477
log::debug!(
471-
"upload_large_file_chunks {:?} proccessing file in {} subchunks of size {} from total {} chunk size {} file size {}",
478+
"upload_large_file_chunks {:?} processing file in {} subchunks of size {} from total {} chunk size {} file size {}",
472479
entry.path(),
473480
num_sub_chunks,
474481
sub_chunk_size,
@@ -530,6 +537,7 @@ async fn upload_large_file_chunks(
530537
processed_chunk_idx, // Needs to be the overall chunk num
531538
total_chunks,
532539
total_bytes,
540+
client.clone(),
533541
remote_repo.to_owned(),
534542
entry.hash().to_owned(),
535543
commit.to_owned(),
@@ -548,6 +556,7 @@ async fn upload_large_file_chunks(
548556
chunk_num,
549557
total_chunks,
550558
total_size,
559+
client,
551560
remote_repo,
552561
entry_hash,
553562
_commit,
@@ -568,7 +577,8 @@ async fn upload_large_file_chunks(
568577
};
569578

570579
let is_compressed = false;
571-
match api::client::commits::upload_data_chunk_to_server_with_retry(
580+
match api::client::commits::upload_data_chunk_to_server_with_client_with_retry(
581+
&client,
572582
&remote_repo,
573583
&buffer,
574584
&entry_hash,

0 commit comments

Comments
 (0)