Skip to content

Commit 8f12c3b

Browse files
authored
Merge pull request #239 from Oxen-AI/fix/push-new-remote-branch
Push new remote branch on non-deep cloned repo
2 parents 0be229e + eb011f6 commit 8f12c3b

File tree

7 files changed

+229
-107
lines changed

7 files changed

+229
-107
lines changed

src/lib/src/api/local/commits.rs

+14
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,20 @@ pub fn list_all(repo: &LocalRepository) -> Result<Vec<Commit>, OxenError> {
278278
Ok(commits)
279279
}
280280

281+
pub fn list_all_paginated(
282+
repo: &LocalRepository,
283+
page_number: usize,
284+
page_size: usize,
285+
) -> Result<PaginatedCommits, OxenError> {
286+
let commits = list_all(repo)?;
287+
let (commits, pagination) = util::paginate(commits, page_number, page_size);
288+
Ok(PaginatedCommits {
289+
status: StatusMessage::resource_found(),
290+
commits,
291+
pagination,
292+
})
293+
}
294+
281295
/// Get commit history given options
282296
pub async fn list_with_opts(
283297
repo: &LocalRepository,

src/lib/src/api/remote/commits.rs

+65
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,44 @@ pub async fn get_by_id(
6969
}
7070
}
7171

72+
pub async fn list_all(remote_repo: &RemoteRepository) -> Result<Vec<Commit>, OxenError> {
73+
let mut all_commits: Vec<Commit> = Vec::new();
74+
let mut page_num = DEFAULT_PAGE_NUM;
75+
let page_size = 100;
76+
77+
let bar = Arc::new(ProgressBar::new_spinner());
78+
bar.set_style(ProgressStyle::default_spinner());
79+
80+
loop {
81+
let page_opts = PaginateOpts {
82+
page_num,
83+
page_size,
84+
};
85+
match list_all_commits_paginated(remote_repo, &page_opts).await {
86+
Ok(paginated_commits) => {
87+
if page_num == DEFAULT_PAGE_NUM {
88+
let bar = oxify_bar(bar.clone(), ProgressBarType::Counter);
89+
bar.set_length(paginated_commits.pagination.total_entries as u64);
90+
}
91+
let n_commits = paginated_commits.commits.len();
92+
all_commits.extend(paginated_commits.commits);
93+
bar.inc(n_commits as u64);
94+
if page_num < paginated_commits.pagination.total_pages {
95+
page_num += 1;
96+
} else {
97+
break;
98+
}
99+
}
100+
Err(err) => {
101+
return Err(err);
102+
}
103+
}
104+
}
105+
bar.finish_and_clear();
106+
107+
Ok(all_commits)
108+
}
109+
72110
pub async fn list_commit_history(
73111
remote_repo: &RemoteRepository,
74112
revision: &str,
@@ -138,6 +176,33 @@ async fn list_commit_history_paginated(
138176
}
139177
}
140178

179+
async fn list_all_commits_paginated(
180+
remote_repo: &RemoteRepository,
181+
page_opts: &PaginateOpts,
182+
) -> Result<PaginatedCommits, OxenError> {
183+
let page_num = page_opts.page_num;
184+
let page_size = page_opts.page_size;
185+
let uri = format!("/commits/all?page={page_num}&page_size={page_size}");
186+
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
187+
188+
let client = client::new_for_url(&url)?;
189+
match client.get(&url).send().await {
190+
Ok(res) => {
191+
let body = client::parse_json_body(&url, res).await?;
192+
let response: Result<PaginatedCommits, serde_json::Error> = serde_json::from_str(&body);
193+
match response {
194+
Ok(j_res) => Ok(j_res),
195+
Err(err) => Err(OxenError::basic_str(format!(
196+
"list_commit_history() Could not deserialize response [{err}]\n{body}"
197+
))),
198+
}
199+
}
200+
Err(err) => Err(OxenError::basic_str(format!(
201+
"list_commit_history() Request failed: {err}"
202+
))),
203+
}
204+
}
205+
141206
pub async fn commit_is_synced(
142207
remote_repo: &RemoteRepository,
143208
commit_id: &str,

src/lib/src/core/index/pusher.rs

+72-71
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use flate2::write::GzEncoder;
1111
use flate2::Compression;
1212
use futures::prelude::*;
1313
use indicatif::ProgressBar;
14-
use std::collections::HashSet;
14+
use std::collections::{HashSet, VecDeque};
1515

1616
use std::io::{BufReader, Read};
1717
use std::sync::Arc;
@@ -75,13 +75,6 @@ async fn validate_repo_is_pushable(
7575
commit_reader: &CommitReader,
7676
head_commit: &Commit,
7777
) -> Result<bool, OxenError> {
78-
// Make sure the remote branch is not ahead of the local branch
79-
// log::debug!(
80-
// "validating repo is pushable for commit {:#?} and branch {:#?}",
81-
// head_commit,
82-
// branch
83-
// );
84-
8578
if remote_is_ahead_of_local(head_commit, remote_repo, commit_reader, branch).await? {
8679
log::debug!("remote is ahead of local for commit {:#?}", head_commit);
8780
if api::remote::commits::can_push(remote_repo, &branch.name, local_repo, head_commit)
@@ -94,17 +87,6 @@ async fn validate_repo_is_pushable(
9487
return Err(OxenError::upstream_merge_conflict());
9588
}
9689
}
97-
// } else {
98-
// log::debug!("remote is not ahead of local for commit {:#?}", head_commit);
99-
// }
100-
101-
if cannot_push_incomplete_history(local_repo, remote_repo, head_commit, branch).await? {
102-
log::debug!(
103-
"cannot_push_incomplete_history is true for commit {:#?}",
104-
head_commit
105-
);
106-
return Err(OxenError::incomplete_local_history());
107-
}
10890

10991
Ok(false)
11092
}
@@ -200,18 +182,24 @@ pub async fn try_push_remote_repo(
200182
mut head_commit: Commit,
201183
requires_merge: bool,
202184
) -> Result<(), OxenError> {
203-
let commits_to_sync =
185+
let commits_to_push =
204186
get_commit_objects_to_sync(local_repo, remote_repo, &head_commit, &branch).await?;
205187

188+
log::debug!("got these commits to push");
189+
190+
if !commits_to_push_are_synced(local_repo, &commits_to_push)? {
191+
return Err(OxenError::incomplete_local_history());
192+
}
193+
206194
log::debug!(
207195
"push_remote_repo commit order after get_commit_objects_to_sync {:?}",
208-
commits_to_sync
196+
commits_to_push
209197
);
210198

211199
let maybe_remote_branch = api::remote::branches::get_by_name(remote_repo, &branch.name).await?;
212200

213201
let (unsynced_entries, _total_size) =
214-
push_missing_commit_objects(local_repo, remote_repo, &commits_to_sync, &branch).await?;
202+
push_missing_commit_objects(local_repo, remote_repo, &commits_to_push, &branch).await?;
215203

216204
log::debug!("🐂 Identifying unsynced commits dbs...");
217205
let unsynced_db_commits =
@@ -301,18 +289,16 @@ async fn get_commit_objects_to_sync(
301289
branch: &Branch,
302290
) -> Result<Vec<Commit>, OxenError> {
303291
let remote_branch = api::remote::branches::get_by_name(remote_repo, &branch.name).await?;
304-
292+
let commit_reader = CommitReader::new(local_repo)?;
305293
let mut commits_to_sync: Vec<Commit>;
306-
// TODO: If remote branch does not yet, recreates all commits regardless of shared history.
307-
// Not a huge deal performance-wise right now, but could be for very commit-heavy repos
308294
if let Some(remote_branch) = remote_branch {
309295
log::debug!(
310296
"get_commit_objects_to_sync found remote branch {:?}, calculating missing commits between local and remote heads", remote_branch
311297
);
312298
let remote_commit = api::remote::commits::get_by_id(remote_repo, &remote_branch.commit_id)
313299
.await?
314300
.unwrap();
315-
let commit_reader = CommitReader::new(local_repo)?;
301+
316302
let merger = Merger::new(local_repo)?;
317303
commits_to_sync =
318304
merger.list_commits_between_commits(&commit_reader, &remote_commit, local_commit)?;
@@ -333,9 +319,31 @@ async fn get_commit_objects_to_sync(
333319
.any(|remote_commit| remote_commit.id == commit.id)
334320
});
335321
} else {
336-
// Branch does not exist on remote yet - get all commits?
337-
log::debug!("get_commit_objects_to_sync remote branch does not exist, getting all commits from local head");
338-
commits_to_sync = api::local::commits::list_from(local_repo, &local_commit.id)?;
322+
// Remote branch does not exist. Find commits to push with reference to whatever
323+
// remote branch head comes first in the local newbranch history, aka what it was branched off of.
324+
325+
// Early return to avoid checking for remote commits: if full local history and no remote branch,
326+
// push full local branch history.
327+
if api::local::commits::commit_history_is_complete(local_repo, local_commit) {
328+
return api::local::commits::list_from(local_repo, &local_commit.id);
329+
}
330+
331+
// Otherwise, find the remote commit that the local branch was branched off of and push everything since then.
332+
let all_commits = api::remote::commits::list_all(remote_repo).await?;
333+
log::debug!("got all remote commits as {:#?}", all_commits);
334+
let maybe_remote_commit =
335+
find_latest_local_commit_synced(local_repo, local_commit, &all_commits)?;
336+
337+
if let Some(remote_commit) = maybe_remote_commit {
338+
let merger = Merger::new(local_repo)?;
339+
commits_to_sync = merger.list_commits_between_commits(
340+
&commit_reader,
341+
&remote_commit,
342+
local_commit,
343+
)?;
344+
} else {
345+
commits_to_sync = api::local::commits::list_from(local_repo, &local_commit.id)?;
346+
}
339347
}
340348

341349
// Order from BASE to HEAD
@@ -474,54 +482,47 @@ async fn remote_is_ahead_of_local(
474482
Ok(!local_head.has_ancestor(&remote_commit.id, reader)?)
475483
}
476484

477-
async fn cannot_push_incomplete_history(
485+
fn find_latest_local_commit_synced(
478486
local_repo: &LocalRepository,
479-
remote_repo: &RemoteRepository,
480487
local_head: &Commit,
481-
branch: &Branch,
482-
) -> Result<bool, OxenError> {
483-
log::debug!("Checking if we can push incomplete history.");
484-
match api::remote::commits::list_commit_history(remote_repo, &branch.name).await {
485-
Err(_) => {
486-
return Ok(!api::local::commits::commit_history_is_complete(
487-
local_repo, local_head,
488-
));
488+
remote_commits: &Vec<Commit>,
489+
) -> Result<Option<Commit>, OxenError> {
490+
let commit_reader = CommitReader::new(local_repo).unwrap();
491+
let mut commits_set: HashSet<String> = HashSet::new();
492+
for remote_commit in remote_commits {
493+
commits_set.insert(remote_commit.id.clone());
494+
}
495+
// let mut current_commit = local_head.clone();
496+
let mut queue: VecDeque<Commit> = VecDeque::new();
497+
queue.push_back(local_head.clone());
498+
499+
while !queue.is_empty() {
500+
let current_commit = queue.pop_front().unwrap();
501+
if commits_set.contains(&current_commit.id) {
502+
return Ok(Some(current_commit));
489503
}
490-
Ok(remote_history) => {
491-
let remote_head = remote_history.first().unwrap();
492-
log::debug!(
493-
"Checking between local head {:?} and remote head {:?} on branch {}",
494-
local_head,
495-
remote_head,
496-
branch.name
497-
);
498-
499-
let commit_reader = CommitReader::new(local_repo)?;
500-
let merger = Merger::new(local_repo)?;
501-
502-
let commits_to_push =
503-
merger.list_commits_between_commits(&commit_reader, remote_head, local_head)?;
504-
505-
let commits_to_push: Vec<Commit> = commits_to_push
506-
.into_iter()
507-
.filter(|commit| {
508-
!remote_history
509-
.iter()
510-
.any(|remote_commit| remote_commit.id == commit.id)
511-
})
512-
.collect();
513-
514-
log::debug!("Found the following commits_to_push: {:?}", commits_to_push);
515-
// Ensure all `commits_to_push` are synced
516-
for commit in commits_to_push {
517-
if !index::commit_sync_status::commit_is_synced(local_repo, &commit) {
518-
return Ok(true);
519-
}
520-
}
504+
for parent_id in current_commit.parent_ids.iter() {
505+
let parent_commit = commit_reader.get_commit_by_id(parent_id)?;
506+
let Some(parent_commit) = parent_commit else {
507+
return Err(OxenError::local_parent_link_broken(&current_commit.id));
508+
};
509+
queue.push_back(parent_commit);
521510
}
522511
}
512+
Ok(None)
513+
}
523514

524-
Ok(false)
515+
fn commits_to_push_are_synced(
516+
local_repo: &LocalRepository,
517+
commits_to_push: &Vec<Commit>,
518+
) -> Result<bool, OxenError> {
519+
for commit in commits_to_push {
520+
if !index::commit_sync_status::commit_is_synced(local_repo, commit) {
521+
log::debug!("commit is not synced {:?}", commit);
522+
return Ok(false);
523+
}
524+
}
525+
Ok(true)
525526
}
526527

527528
async fn poll_until_synced(

src/lib/src/core/index/schema_reader.rs

-36
Original file line numberDiff line numberDiff line change
@@ -182,42 +182,6 @@ impl SchemaReader {
182182

183183
pub fn list_schema_entries(&self) -> Result<Vec<SchemaEntry>, OxenError> {
184184
log::debug!("trying to get root hash for commit {:?}", self.commit_id);
185-
// Print out all the entries of dir_hashes db
186-
// let iter = self.dir_hashes_db.iterator(rocksdb::IteratorMode::Start);
187-
188-
// log::debug!(
189-
// "iterating over dir_hashes_db for commit {:?}",
190-
// self.commit_id
191-
// );
192-
// for item in iter {
193-
// match item {
194-
// Ok((key, value)) => {
195-
// let key = match std::str::from_utf8(&key) {
196-
// Ok(k) => k,
197-
// Err(e) => {
198-
// log::error!("Failed to convert key to string: {:?}", e);
199-
// continue;
200-
// }
201-
// };
202-
// let value = match std::str::from_utf8(&value) {
203-
// Ok(v) => v,
204-
// Err(e) => {
205-
// log::error!("Failed to convert value to string: {:?}", e);
206-
// continue;
207-
// }
208-
// };
209-
// log::debug!("key {:?} value {:?}", key, value);
210-
// }
211-
// Err(e) => {
212-
// log::error!("Iterator error: {:?}", e);
213-
// }
214-
// }
215-
// }
216-
217-
// log::debug!("done iterating");
218-
219-
// log::debug!("done iterating round 2");
220-
221185
let root_hash: String = path_db::get_entry(&self.dir_hashes_db, "")?.ok_or(
222186
OxenError::basic_str("Could not find root hash in dir hashes db"),
223187
)?;

src/server/src/controllers/commits.rs

+24
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,30 @@ pub async fn commit_history(
117117
}
118118
}
119119

120+
// List all commits in the rpeo
121+
pub async fn list_all(
122+
req: HttpRequest,
123+
query: web::Query<PageNumQuery>,
124+
) -> actix_web::Result<HttpResponse, OxenHttpError> {
125+
let app_data = app_data(&req)?;
126+
let namespace: Option<&str> = req.match_info().get("namespace");
127+
let repo_name: Option<&str> = req.match_info().get("repo_name");
128+
129+
let page: usize = query.page.unwrap_or(constants::DEFAULT_PAGE_NUM);
130+
let page_size: usize = query.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE);
131+
132+
if let (Some(namespace), Some(repo_name)) = (namespace, repo_name) {
133+
let repo = get_repo(&app_data.path, namespace, repo_name)?;
134+
135+
let paginated_commits = api::local::commits::list_all_paginated(&repo, page, page_size)?;
136+
137+
Ok(HttpResponse::Ok().json(paginated_commits))
138+
} else {
139+
let msg = "Must supply `namespace`, `repo_name` params";
140+
Err(OxenHttpError::BadRequest(msg.into()))
141+
}
142+
}
143+
120144
pub async fn show(req: HttpRequest) -> actix_web::Result<HttpResponse, OxenHttpError> {
121145
let app_data = app_data(&req)?;
122146
let namespace = path_param(&req, "namespace")?;

src/server/src/routes.rs

+4
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ pub fn config(cfg: &mut web::ServiceConfig) {
5656
"/{namespace}/{repo_name}/objects_db",
5757
web::get().to(controllers::commits::download_objects_db),
5858
)
59+
.route(
60+
"/{namespace}/{repo_name}/commits/all",
61+
web::get().to(controllers::commits::list_all),
62+
)
5963
.route(
6064
"/{namespace}/{repo_name}/commits/{commit_id}/latest_synced",
6165
web::get().to(controllers::commits::latest_synced),

0 commit comments

Comments
 (0)