Skip to content

Commit 6ffdbd6

Browse files
committed
fix push remote branch
1 parent 8903850 commit 6ffdbd6

File tree

3 files changed

+173
-70
lines changed

3 files changed

+173
-70
lines changed

src/lib/src/core/index/pusher.rs

+123-34
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ use flate2::write::GzEncoder;
1111
use flate2::Compression;
1212
use futures::prelude::*;
1313
use indicatif::ProgressBar;
14-
use std::collections::HashSet;
14+
use std::collections::{HashMap, HashSet, VecDeque};
1515

16+
use std::hash::Hash;
1617
use std::io::{BufReader, Read};
1718
use std::sync::Arc;
1819

@@ -301,7 +302,7 @@ async fn get_commit_objects_to_sync(
301302
branch: &Branch,
302303
) -> Result<Vec<Commit>, OxenError> {
303304
let remote_branch = api::remote::branches::get_by_name(remote_repo, &branch.name).await?;
304-
305+
let commit_reader = CommitReader::new(local_repo)?;
305306
let mut commits_to_sync: Vec<Commit>;
306307
// TODO: If remote branch does not yet, recreates all commits regardless of shared history.
307308
// Not a huge deal performance-wise right now, but could be for very commit-heavy repos
@@ -312,7 +313,7 @@ async fn get_commit_objects_to_sync(
312313
let remote_commit = api::remote::commits::get_by_id(remote_repo, &remote_branch.commit_id)
313314
.await?
314315
.unwrap();
315-
let commit_reader = CommitReader::new(local_repo)?;
316+
316317
let merger = Merger::new(local_repo)?;
317318
commits_to_sync =
318319
merger.list_commits_between_commits(&commit_reader, &remote_commit, local_commit)?;
@@ -335,7 +336,36 @@ async fn get_commit_objects_to_sync(
335336
} else {
336337
// Branch does not exist on remote yet - get all commits?
337338
log::debug!("get_commit_objects_to_sync remote branch does not exist, getting all commits from local head");
338-
commits_to_sync = api::local::commits::list_from(local_repo, &local_commit.id)?;
339+
340+
// Do the branch thing again
341+
let branches: Vec<Branch> = api::remote::branches::list(remote_repo).await?;
342+
let maybe_remote_head =
343+
find_local_commit_matching_remote_branch(local_repo, local_commit, &branches)?;
344+
345+
if let Some(remote_head) = maybe_remote_head {
346+
let merger = Merger::new(local_repo)?;
347+
commits_to_sync =
348+
merger.list_commits_between_commits(&commit_reader, &remote_head, local_commit)?;
349+
350+
println!("🐂 Getting commit history...");
351+
let remote_history =
352+
api::remote::commits::list_commit_history(remote_repo, &branch.name)
353+
.await
354+
.unwrap_or_else(|_| vec![]);
355+
log::debug!(
356+
"get_commit_objects_to_sync calculated {} commits",
357+
commits_to_sync.len()
358+
);
359+
360+
// Filter out any commits_to_sync that are in the remote_history
361+
commits_to_sync.retain(|commit| {
362+
!remote_history
363+
.iter()
364+
.any(|remote_commit| remote_commit.id == commit.id)
365+
});
366+
} else {
367+
commits_to_sync = api::local::commits::list_from(local_repo, &local_commit.id)?;
368+
}
339369
}
340370

341371
// Order from BASE to HEAD
@@ -474,6 +504,41 @@ async fn remote_is_ahead_of_local(
474504
Ok(!local_head.has_ancestor(&remote_commit.id, reader)?)
475505
}
476506

507+
fn find_local_commit_matching_remote_branch(
508+
local_repo: &LocalRepository,
509+
local_head: &Commit,
510+
remote_branches: &Vec<Branch>,
511+
) -> Result<Option<Commit>, OxenError> {
512+
let commit_reader = CommitReader::new(local_repo).unwrap();
513+
let mut branches_map: HashMap<String, String> = HashMap::new();
514+
for remote_branch in remote_branches {
515+
branches_map.insert(remote_branch.commit_id.clone(), remote_branch.name.clone());
516+
}
517+
// let mut current_commit = local_head.clone();
518+
let mut queue: VecDeque<Commit> = VecDeque::new();
519+
queue.push_back(local_head.clone());
520+
521+
while queue.len() > 0 {
522+
let current_commit = queue.pop_front().unwrap();
523+
if branches_map.contains_key(&current_commit.id) {
524+
log::debug!(
525+
"Found commit {:#?} as head of remote branch {:?}",
526+
current_commit,
527+
branches_map.get(&current_commit.id)
528+
);
529+
return Ok(Some(current_commit));
530+
}
531+
for parent_id in current_commit.parent_ids.iter() {
532+
let parent_commit = commit_reader.get_commit_by_id(parent_id)?;
533+
let Some(parent_commit) = parent_commit else {
534+
return Err(OxenError::local_parent_link_broken(&current_commit.id));
535+
};
536+
queue.push_back(parent_commit);
537+
}
538+
}
539+
Ok(None)
540+
}
541+
477542
async fn cannot_push_incomplete_history(
478543
local_repo: &LocalRepository,
479544
remote_repo: &RemoteRepository,
@@ -482,46 +547,70 @@ async fn cannot_push_incomplete_history(
482547
) -> Result<bool, OxenError> {
483548
log::debug!("Checking if we can push incomplete history.");
484549
match api::remote::commits::list_commit_history(remote_repo, &branch.name).await {
485-
Err(_) => {
486-
return Ok(!api::local::commits::commit_history_is_complete(
487-
local_repo, local_head,
488-
));
550+
Err(e) => {
551+
log::debug!("got err when checking remote history {:?}", e);
552+
let remote_branches = api::remote::branches::list(remote_repo).await?;
553+
log::debug!("got remote branches {:?}", remote_branches);
554+
555+
let maybe_remote_head =
556+
find_local_commit_matching_remote_branch(local_repo, local_head, &remote_branches)?;
557+
558+
log::debug!("got maybe_remote_head {:?}", maybe_remote_head);
559+
560+
let Some(remote_head) = maybe_remote_head else {
561+
// No commits in the local branch history match any remote branch head.
562+
// This is the empty repo case - require a complete history
563+
return Ok(!api::local::commits::commit_history_is_complete(
564+
local_repo, local_head,
565+
));
566+
};
567+
568+
// Otherwise, we have a remote head that matches a local commit - get commits between and check if they're synced
569+
return Ok(!commits_to_push_are_synced(
570+
local_repo,
571+
local_head,
572+
&remote_head,
573+
)?);
489574
}
490575
Ok(remote_history) => {
491-
let remote_head = remote_history.first().unwrap();
492576
log::debug!(
493-
"Checking between local head {:?} and remote head {:?} on branch {}",
577+
"got remote history {:#?} on branch {:#?}",
578+
remote_history,
579+
branch
580+
);
581+
let remote_head = remote_history.first().unwrap();
582+
return Ok(!commits_to_push_are_synced(
583+
local_repo,
494584
local_head,
495585
remote_head,
496-
branch.name
497-
);
586+
)?);
587+
}
588+
}
589+
}
498590

499-
let commit_reader = CommitReader::new(local_repo)?;
500-
let merger = Merger::new(local_repo)?;
591+
fn commits_to_push_are_synced(
592+
local_repo: &LocalRepository,
593+
local_head: &Commit,
594+
remote_head: &Commit,
595+
) -> Result<bool, OxenError> {
596+
let commit_reader = CommitReader::new(local_repo)?;
597+
let merger = Merger::new(local_repo)?;
501598

502-
let commits_to_push =
503-
merger.list_commits_between_commits(&commit_reader, remote_head, local_head)?;
504-
505-
let commits_to_push: Vec<Commit> = commits_to_push
506-
.into_iter()
507-
.filter(|commit| {
508-
!remote_history
509-
.iter()
510-
.any(|remote_commit| remote_commit.id == commit.id)
511-
})
512-
.collect();
513-
514-
log::debug!("Found the following commits_to_push: {:?}", commits_to_push);
515-
// Ensure all `commits_to_push` are synced
516-
for commit in commits_to_push {
517-
if !index::commit_sync_status::commit_is_synced(local_repo, &commit) {
518-
return Ok(true);
519-
}
520-
}
599+
let commits_to_push =
600+
merger.list_commits_between_commits(&commit_reader, remote_head, local_head)?;
601+
602+
log::debug!("got commits to push {:?}", commits_to_push);
603+
604+
for commit in commits_to_push {
605+
if !index::commit_sync_status::commit_is_synced(local_repo, &commit) {
606+
log::debug!("commit is not synced {:?}", commit);
607+
return Ok(false);
521608
}
522609
}
523610

524-
Ok(false)
611+
log::debug!("commits to push ARE synced!");
612+
613+
Ok(true)
525614
}
526615

527616
async fn poll_until_synced(

src/lib/src/core/index/schema_reader.rs

-36
Original file line numberDiff line numberDiff line change
@@ -182,42 +182,6 @@ impl SchemaReader {
182182

183183
pub fn list_schema_entries(&self) -> Result<Vec<SchemaEntry>, OxenError> {
184184
log::debug!("trying to get root hash for commit {:?}", self.commit_id);
185-
// Print out all the entries of dir_hashes db
186-
// let iter = self.dir_hashes_db.iterator(rocksdb::IteratorMode::Start);
187-
188-
// log::debug!(
189-
// "iterating over dir_hashes_db for commit {:?}",
190-
// self.commit_id
191-
// );
192-
// for item in iter {
193-
// match item {
194-
// Ok((key, value)) => {
195-
// let key = match std::str::from_utf8(&key) {
196-
// Ok(k) => k,
197-
// Err(e) => {
198-
// log::error!("Failed to convert key to string: {:?}", e);
199-
// continue;
200-
// }
201-
// };
202-
// let value = match std::str::from_utf8(&value) {
203-
// Ok(v) => v,
204-
// Err(e) => {
205-
// log::error!("Failed to convert value to string: {:?}", e);
206-
// continue;
207-
// }
208-
// };
209-
// log::debug!("key {:?} value {:?}", key, value);
210-
// }
211-
// Err(e) => {
212-
// log::error!("Iterator error: {:?}", e);
213-
// }
214-
// }
215-
// }
216-
217-
// log::debug!("done iterating");
218-
219-
// log::debug!("done iterating round 2");
220-
221185
let root_hash: String = path_db::get_entry(&self.dir_hashes_db, "")?.ok_or(
222186
OxenError::basic_str("Could not find root hash in dir hashes db"),
223187
)?;

tests/test_push_pull.rs

+50
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::path::Path;
22

33
use liboxen::api;
4+
use liboxen::api::remote;
45
use liboxen::command;
56
use liboxen::constants;
67
use liboxen::constants::DEFAULT_BRANCH_NAME;
@@ -644,3 +645,52 @@ async fn test_push_pull_moved_files() -> Result<(), OxenError> {
644645
})
645646
.await
646647
}
648+
649+
#[tokio::test]
650+
async fn test_push_new_branch_default_clone() -> Result<(), OxenError> {
651+
test::run_training_data_fully_sync_remote(|_local_repo, remote_repo| async move {
652+
let remote_repo_copy = remote_repo.clone();
653+
test::run_empty_dir_test_async(|repo_dir| async move {
654+
// Clone the remote repo
655+
let repo_dir = repo_dir.join("repoo");
656+
let cloned_repo = command::clone_url(&remote_repo.remote.url, &repo_dir).await?;
657+
658+
// Create-checkout a new branch
659+
let branch_name = "new-branch";
660+
command::create_checkout(&cloned_repo, branch_name)?;
661+
662+
// Add a file
663+
let contents = "this is the file";
664+
let path = &cloned_repo.path.join("a.txt");
665+
test::write_txt_file_to_path(path, contents)?;
666+
667+
command::add(&cloned_repo, path)?;
668+
let commit = command::commit(&cloned_repo, "Adding file for first time")?;
669+
670+
// Try to push upstream branch
671+
let push_result = command::push_remote_branch(
672+
&cloned_repo,
673+
constants::DEFAULT_REMOTE_NAME,
674+
branch_name,
675+
)
676+
.await;
677+
678+
log::debug!("Push result: {:?}", push_result);
679+
680+
assert!(push_result.is_ok());
681+
682+
// Get the remote branch
683+
let remote_branch = api::remote::branches::get_by_name(&remote_repo, branch_name)
684+
.await?
685+
.unwrap();
686+
687+
assert_eq!(remote_branch.commit_id, commit.id);
688+
689+
Ok(repo_dir)
690+
})
691+
.await?;
692+
693+
Ok(remote_repo_copy)
694+
})
695+
.await
696+
}

0 commit comments

Comments
 (0)