diff --git a/internal/sync_test.go b/internal/sync_test.go index 89b318a970..09418a8556 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "io/fs" "net/http" "os" "os/exec" @@ -15,6 +16,7 @@ import ( "time" _ "github.com/databricks/cli/cmd/sync" + "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/sync" "github.com/databricks/cli/libs/testfile" "github.com/databricks/databricks-sdk-go" @@ -63,6 +65,7 @@ type syncTest struct { t *testing.T c *cobraTestRunner w *databricks.WorkspaceClient + f filer.Filer localRoot string remoteRoot string } @@ -73,6 +76,8 @@ func setupSyncTest(t *testing.T, args ...string) *syncTest { w := databricks.Must(databricks.NewWorkspaceClient()) localRoot := t.TempDir() remoteRoot := temporaryWorkspaceDir(t, w) + f, err := filer.NewWorkspaceFilesClient(w, remoteRoot) + require.NoError(t, err) // Prepend common arguments. args = append([]string{ @@ -90,6 +95,7 @@ func setupSyncTest(t *testing.T, args ...string) *syncTest { t: t, c: c, w: w, + f: f, localRoot: localRoot, remoteRoot: remoteRoot, } @@ -160,6 +166,21 @@ func (a *syncTest) remoteFileContent(ctx context.Context, relativePath string, e }, 30*time.Second, 5*time.Second) } +func (a *syncTest) remoteNotExist(ctx context.Context, relativePath string) { + _, err := a.f.Stat(ctx, relativePath) + require.ErrorIs(a.t, err, fs.ErrNotExist) +} + +func (a *syncTest) remoteExists(ctx context.Context, relativePath string) { + _, err := a.f.Stat(ctx, relativePath) + require.NoError(a.t, err) +} + +func (a *syncTest) touchFile(ctx context.Context, path string) { + err := a.f.Write(ctx, path, strings.NewReader("contents"), filer.CreateParentDirectories) + require.NoError(a.t, err) +} + func (a *syncTest) objectType(ctx context.Context, relativePath string, expected string) { path := path.Join(a.remoteRoot, relativePath) @@ -297,11 +318,43 @@ func TestAccSyncNestedFolderSync(t *testing.T) { // delete f.Remove(t) assertSync.waitForCompletionMarker() - // directories are not cleaned up right now. This is not ideal - assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", []string{}) + assertSync.remoteNotExist(ctx, "dir1") assertSync.snapshotContains(append(repoFiles, ".gitignore")) } +func TestAccSyncNestedFolderDoesntFailOnNonEmptyDirectory(t *testing.T) { + ctx := context.Background() + assertSync := setupSyncTest(t, "--watch") + + // .gitignore is created by the sync process to enforce .databricks is not synced + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) + + // New file + localFilePath := filepath.Join(assertSync.localRoot, "dir1/dir2/dir3/foo.txt") + err := os.MkdirAll(filepath.Dir(localFilePath), 0o755) + assert.NoError(t, err) + f := testfile.CreateFile(t, localFilePath) + defer f.Close(t) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", []string{"foo.txt"}) + + // Add file to dir1 to simulate a user writing to the workspace directly. + assertSync.touchFile(ctx, "dir1/foo.txt") + + // Remove original file. + f.Remove(t) + assertSync.waitForCompletionMarker() + + // Sync should have removed these directories. + assertSync.remoteNotExist(ctx, "dir1/dir2/dir3") + assertSync.remoteNotExist(ctx, "dir1/dir2") + + // Sync should have ignored not being able to delete dir1. + assertSync.remoteExists(ctx, "dir1/foo.txt") + assertSync.remoteExists(ctx, "dir1") +} + func TestAccSyncNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { ctx := context.Background() assertSync := setupSyncTest(t, "--watch") @@ -326,12 +379,10 @@ func TestAccSyncNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { // delete f.Remove(t) assertSync.waitForCompletionMarker() - // directories are not cleaned up right now. This is not ideal - assertSync.remoteDirContent(ctx, "dir1/a b+c/c+d e", []string{}) + assertSync.remoteNotExist(ctx, "dir1/a b+c/c+d e") assertSync.snapshotContains(append(repoFiles, ".gitignore")) } -// sync does not clean up empty directories from the workspace file system. // This is a check for the edge case when a user does the following: // // 1. Add file foo/bar.txt @@ -359,8 +410,7 @@ func TestAccSyncIncrementalFileOverwritesFolder(t *testing.T) { f.Remove(t) os.Remove(filepath.Join(assertSync.localRoot, "foo")) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "foo", []string{}) - assertSync.objectType(ctx, "foo", "DIRECTORY") + assertSync.remoteNotExist(ctx, "foo") assertSync.snapshotContains(append(repoFiles, ".gitignore")) f2 := testfile.CreateFile(t, filepath.Join(assertSync.localRoot, "foo")) diff --git a/libs/sync/diff.go b/libs/sync/diff.go index 72c1d5aab6..26e99b3483 100644 --- a/libs/sync/diff.go +++ b/libs/sync/diff.go @@ -1,10 +1,100 @@ package sync +import ( + "path" +) + type diff struct { - put []string delete []string + rmdir []string + mkdir []string + put []string } func (d diff) IsEmpty() bool { return len(d.put) == 0 && len(d.delete) == 0 } + +// groupedMkdir returns a slice of slices of paths to create. +// Because the underlying mkdir calls create intermediate directories, +// we can group them together to reduce the total number of calls. +// This returns a slice of a slice for parity with [groupedRmdir]. +func (d diff) groupedMkdir() [][]string { + // Compute the set of prefixes of all paths to create. + prefixes := make(map[string]bool) + for _, name := range d.mkdir { + dir := path.Dir(name) + for dir != "." && dir != "/" { + prefixes[dir] = true + dir = path.Dir(dir) + } + } + + var out []string + + // Collect all paths that are not a prefix of another path. + for _, name := range d.mkdir { + if !prefixes[name] { + out = append(out, name) + } + } + + return [][]string{out} +} + +// groupedRmdir returns a slice of slices of paths to delete. +// The outer slice is ordered such that each inner slice can be +// deleted in parallel, as long as it is processed in order. +// The first entry will contain leaf directories, the second entry +// will contain intermediate directories, and so on. +func (d diff) groupedRmdir() [][]string { + // Compute the number of times each directory is a prefix of another directory. + prefixes := make(map[string]int) + for _, dir := range d.rmdir { + prefixes[dir] = 0 + } + for _, dir := range d.rmdir { + dir = path.Dir(dir) + for dir != "." && dir != "/" { + // Increment the prefix count for this directory, only if it + // it one of the directories we are deleting. + if _, ok := prefixes[dir]; ok { + prefixes[dir]++ + } + dir = path.Dir(dir) + } + } + + var out [][]string + + for len(prefixes) > 0 { + var toDelete []string + + // Find directories which are not a prefix of another directory. + // These are the directories we can delete. + for dir, count := range prefixes { + if count == 0 { + toDelete = append(toDelete, dir) + delete(prefixes, dir) + } + } + + // Remove these directories from the prefixes map. + for _, dir := range toDelete { + dir = path.Dir(dir) + for dir != "." && dir != "/" { + // Decrement the prefix count for this directory, only if it + // it one of the directories we are deleting. + if _, ok := prefixes[dir]; ok { + prefixes[dir]-- + } + dir = path.Dir(dir) + } + } + + // Add these directories to the output. + out = append(out, toDelete) + } + + return out +} diff --git a/libs/sync/diff_test.go b/libs/sync/diff_test.go new file mode 100644 index 0000000000..ff44887218 --- /dev/null +++ b/libs/sync/diff_test.go @@ -0,0 +1,73 @@ +package sync + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDiffGroupedMkdir(t *testing.T) { + d := diff{ + mkdir: []string{ + "foo", + "foo/bar", + "foo/bar/baz1", + "foo/bar/baz2", + "foo1", + "a/b", + "a/b/c/d/e/f", + }, + } + + // Expect only leaf directories to be included. + out := d.groupedMkdir() + assert.Len(t, out, 1) + assert.ElementsMatch(t, []string{ + "foo/bar/baz1", + "foo/bar/baz2", + "foo1", + "a/b/c/d/e/f", + }, out[0]) +} + +func TestDiffGroupedRmdir(t *testing.T) { + d := diff{ + rmdir: []string{ + "a/b/c/d/e/f", + "a/b/c/d/e", + "a/b/c/d", + "a/b/c", + "a/b/e/f/g/h", + "a/b/e/f/g", + "a/b/e/f", + "a/b/e", + "a/b", + }, + } + + out := d.groupedRmdir() + assert.Len(t, out, 5) + assert.ElementsMatch(t, []string{"a/b/c/d/e/f", "a/b/e/f/g/h"}, out[0]) + assert.ElementsMatch(t, []string{"a/b/c/d/e", "a/b/e/f/g"}, out[1]) + assert.ElementsMatch(t, []string{"a/b/c/d", "a/b/e/f"}, out[2]) + assert.ElementsMatch(t, []string{"a/b/c", "a/b/e"}, out[3]) + assert.ElementsMatch(t, []string{"a/b"}, out[4]) +} + +func TestDiffGroupedRmdirWithLeafsOnly(t *testing.T) { + d := diff{ + rmdir: []string{ + "foo/bar/baz1", + "foo/bar1", + "foo/bar/baz2", + "foo/bar2", + "foo1", + "foo2", + }, + } + + // Expect all directories to be included. + out := d.groupedRmdir() + assert.Len(t, out, 1) + assert.ElementsMatch(t, d.rmdir, out[0]) +} diff --git a/libs/sync/dirset.go b/libs/sync/dirset.go new file mode 100644 index 0000000000..3c37c97cfa --- /dev/null +++ b/libs/sync/dirset.go @@ -0,0 +1,54 @@ +package sync + +import ( + "path" + "path/filepath" + "sort" +) + +// DirSet is a set of directories. +type DirSet map[string]struct{} + +// MakeDirSet turns a list of file paths into the complete set of directories +// that is needed to store them (including parent directories). +func MakeDirSet(files []string) DirSet { + out := map[string]struct{}{} + + // Iterate over all files. + for _, f := range files { + // Get the directory of the file in /-separated form. + dir := filepath.ToSlash(filepath.Dir(f)) + + // Add this directory and its parents until it is either "." or already in the set. + for dir != "." { + if _, ok := out[dir]; ok { + break + } + out[dir] = struct{}{} + dir = path.Dir(dir) + } + } + + return out +} + +// Slice returns a sorted copy of the dirset elements as a slice. +func (dirset DirSet) Slice() []string { + out := make([]string, 0, len(dirset)) + for dir := range dirset { + out = append(out, dir) + } + sort.Strings(out) + return out +} + +// Remove returns the set difference of two DirSets. +func (dirset DirSet) Remove(other DirSet) DirSet { + out := map[string]struct{}{} + for dir := range dirset { + if _, ok := other[dir]; !ok { + out[dir] = struct{}{} + } + } + return out +} diff --git a/libs/sync/dirset_test.go b/libs/sync/dirset_test.go new file mode 100644 index 0000000000..7e920819c0 --- /dev/null +++ b/libs/sync/dirset_test.go @@ -0,0 +1,37 @@ +package sync + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "golang.org/x/exp/maps" +) + +func TestMakeDirSet(t *testing.T) { + assert.ElementsMatch(t, + []string{ + "a", + "a/b", + "a/b/c", + "a/b/d", + "a/e", + "b", + }, + maps.Keys( + MakeDirSet([]string{ + "./a/b/c/file1", + "./a/b/c/file2", + "./a/b/d/file", + "./a/e/file", + "b/file", + }), + ), + ) +} + +func TestDirSetRemove(t *testing.T) { + a := MakeDirSet([]string{"./a/b/c/file1"}) + b := MakeDirSet([]string{"./a/b/d/file2"}) + assert.ElementsMatch(t, []string{"a/b/c"}, a.Remove(b).Slice()) + assert.ElementsMatch(t, []string{"a/b/d"}, b.Remove(a).Slice()) +} diff --git a/libs/sync/repofiles/repofiles.go b/libs/sync/repofiles/repofiles.go deleted file mode 100644 index 8fcabc113e..0000000000 --- a/libs/sync/repofiles/repofiles.go +++ /dev/null @@ -1,159 +0,0 @@ -package repofiles - -import ( - "context" - "errors" - "fmt" - "net/http" - "net/url" - "os" - "path" - "path/filepath" - "strings" - - "github.com/databricks/databricks-sdk-go" - "github.com/databricks/databricks-sdk-go/apierr" - "github.com/databricks/databricks-sdk-go/client" - "github.com/databricks/databricks-sdk-go/service/workspace" -) - -// RepoFiles wraps reading and writing into a remote repo with safeguards to prevent -// accidental deletion of repos and more robust methods to overwrite workspace files -type RepoFiles struct { - repoRoot string - localRoot string - workspaceClient *databricks.WorkspaceClient -} - -func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient) *RepoFiles { - return &RepoFiles{ - repoRoot: repoRoot, - localRoot: localRoot, - workspaceClient: workspaceClient, - } -} - -func (r *RepoFiles) remotePath(relativePath string) (string, error) { - fullPath := path.Join(r.repoRoot, relativePath) - cleanFullPath := path.Clean(fullPath) - if !strings.HasPrefix(cleanFullPath, r.repoRoot) { - return "", fmt.Errorf("relative file path is not inside repo root: %s", relativePath) - } - // path.Clean will remove any trailing / so it's enough to check cleanFullPath == r.repoRoot - if cleanFullPath == r.repoRoot { - return "", fmt.Errorf("file path relative to repo root cannot be empty: %s", relativePath) - } - return cleanFullPath, nil -} - -func (r *RepoFiles) readLocal(relativePath string) ([]byte, error) { - localPath := filepath.Join(r.localRoot, relativePath) - return os.ReadFile(localPath) -} - -func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, content []byte) error { - apiClientConfig := r.workspaceClient.Config - apiClientConfig.HTTPTimeoutSeconds = 600 - apiClient, err := client.New(apiClientConfig) - if err != nil { - return err - } - remotePath, err := r.remotePath(relativePath) - if err != nil { - return err - } - escapedPath := url.PathEscape(strings.TrimLeft(remotePath, "/")) - apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=true", escapedPath) - - err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) - - // Handling some edge cases when an upload might fail - // - // We cannot do more precise error scoping here because the API does not - // provide descriptive errors yet - // - // TODO: narrow down the error condition scope of this "if" block to only - // trigger for the specific edge cases instead of all errors once the API - // implements them - if err != nil { - // Delete any artifact files incase non overwriteable by the current file - // type and thus are failing the PUT request. - // files, folders and notebooks might not have been cleaned up and they - // can't overwrite each other. If a folder `foo` exists, then attempts to - // PUT a file `foo` will fail - err := r.workspaceClient.Workspace.Delete(ctx, - workspace.Delete{ - Path: remotePath, - Recursive: true, - }, - ) - // ignore RESOURCE_DOES_NOT_EXIST here incase nothing existed at remotePath - var aerr *apierr.APIError - if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { - err = nil - } - if err != nil { - return err - } - - // Mkdir parent dirs incase they are what's causing the PUT request to - // fail - err = r.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(remotePath)) - if err != nil { - return fmt.Errorf("could not mkdir to put file: %s", err) - } - - // Attempt to upload file again after cleanup/setup - err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) - if err != nil { - return err - } - } - return nil -} - -func (r *RepoFiles) deleteRemote(ctx context.Context, relativePath string) error { - remotePath, err := r.remotePath(relativePath) - if err != nil { - return err - } - return r.workspaceClient.Workspace.Delete(ctx, - workspace.Delete{ - Path: remotePath, - Recursive: false, - }, - ) -} - -// The API calls for a python script foo.py would be -// `PUT foo.py` -// `DELETE foo.py` -// -// The API calls for a python notebook foo.py would be -// `PUT foo.py` -// `DELETE foo` -// -// The workspace file system backend strips .py from the file name if the python -// file is a notebook -func (r *RepoFiles) PutFile(ctx context.Context, relativePath string) error { - content, err := r.readLocal(relativePath) - if err != nil { - return err - } - - return r.writeRemote(ctx, relativePath, content) -} - -func (r *RepoFiles) DeleteFile(ctx context.Context, relativePath string) error { - err := r.deleteRemote(ctx, relativePath) - - // We explictly ignore RESOURCE_DOES_NOT_EXIST error to make delete idempotent - var aerr *apierr.APIError - if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { - err = nil - } - return nil -} - -// TODO: write integration tests for all non happy path cases that rely on -// specific behaviour of the workspace apis diff --git a/libs/sync/repofiles/repofiles_test.go b/libs/sync/repofiles/repofiles_test.go deleted file mode 100644 index 2a881d90d0..0000000000 --- a/libs/sync/repofiles/repofiles_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package repofiles - -import ( - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRepoFilesRemotePath(t *testing.T) { - repoRoot := "/Repos/doraemon/bar" - repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil) - - remotePath, err := repoFiles.remotePath("a/b/c") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/../d") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/d", remotePath) - - remotePath, err = repoFiles.remotePath("a/../c") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/c/.") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/c/d/./../../f/g") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/f/g", remotePath) - - _, err = repoFiles.remotePath("..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ..`) - - _, err = repoFiles.remotePath("a/../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: a/../..`) - - _, err = repoFiles.remotePath("./../.") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`) - - _, err = repoFiles.remotePath("/./.././..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: /./.././..`) - - _, err = repoFiles.remotePath("./../.") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`) - - _, err = repoFiles.remotePath("./..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./..`) - - _, err = repoFiles.remotePath("./../../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../../..`) - - _, err = repoFiles.remotePath("./../a/./b../../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../a/./b../../..`) - - _, err = repoFiles.remotePath("../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ../..`) - - _, err = repoFiles.remotePath(".//a/..//./b/..") - assert.ErrorContains(t, err, `file path relative to repo root cannot be empty`) - - _, err = repoFiles.remotePath("a/b/../..") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath("") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath(".") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath("/") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") -} - -func TestRepoReadLocal(t *testing.T) { - tempDir := t.TempDir() - helloPath := filepath.Join(tempDir, "hello.txt") - err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm) - assert.NoError(t, err) - - repoFiles := Create("/Repos/doraemon/bar", tempDir, nil) - bytes, err := repoFiles.readLocal("./a/../hello.txt") - assert.NoError(t, err) - assert.Equal(t, "my name is doraemon :P", string(bytes)) -} diff --git a/libs/sync/snapshot.go b/libs/sync/snapshot.go index 1ea7b18bdb..1680f04626 100644 --- a/libs/sync/snapshot.go +++ b/libs/sync/snapshot.go @@ -15,6 +15,7 @@ import ( "github.com/databricks/cli/libs/fileset" "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/notebook" + "golang.org/x/exp/maps" ) // Bump it up every time a potentially breaking change is made to the snapshot schema @@ -183,6 +184,18 @@ func (s *Snapshot) diff(ctx context.Context, all []fileset.File) (change diff, e localFileSet[f.Relative] = struct{}{} } + // Capture both previous and current set of files. + previousFiles := maps.Keys(lastModifiedTimes) + currentFiles := maps.Keys(localFileSet) + + // Build directory sets to figure out which directories to create and which to remove. + previousDirectories := MakeDirSet(previousFiles) + currentDirectories := MakeDirSet(currentFiles) + + // Create new directories; remove stale directories. + change.mkdir = currentDirectories.Remove(previousDirectories).Slice() + change.rmdir = previousDirectories.Remove(currentDirectories).Slice() + for _, f := range all { // get current modified timestamp modified := f.Modified() @@ -252,6 +265,7 @@ func (s *Snapshot) diff(ctx context.Context, all []fileset.File) (change diff, e // add them to a delete batch change.delete = append(change.delete, remoteName) } + // and remove them from the snapshot for _, remoteName := range change.delete { // we do note assert that remoteName exists in remoteToLocalNames since it @@ -262,5 +276,6 @@ func (s *Snapshot) diff(ctx context.Context, all []fileset.File) (change diff, e delete(remoteToLocalNames, remoteName) delete(localToRemoteNames, localName) } + return } diff --git a/libs/sync/snapshot_test.go b/libs/sync/snapshot_test.go index 8154b79141..c2e8f6b80c 100644 --- a/libs/sync/snapshot_test.go +++ b/libs/sync/snapshot_test.go @@ -139,7 +139,10 @@ func TestFolderDiff(t *testing.T) { change, err := state.diff(ctx, files) assert.NoError(t, err) assert.Len(t, change.delete, 0) + assert.Len(t, change.rmdir, 0) + assert.Len(t, change.mkdir, 1) assert.Len(t, change.put, 1) + assert.Contains(t, change.mkdir, "foo") assert.Contains(t, change.put, "foo/bar.py") f1.Remove(t) @@ -148,8 +151,11 @@ func TestFolderDiff(t *testing.T) { change, err = state.diff(ctx, files) assert.NoError(t, err) assert.Len(t, change.delete, 1) + assert.Len(t, change.rmdir, 1) + assert.Len(t, change.mkdir, 0) assert.Len(t, change.put, 0) assert.Contains(t, change.delete, "foo/bar") + assert.Contains(t, change.rmdir, "foo") } func TestPythonNotebookDiff(t *testing.T) { diff --git a/libs/sync/sync.go b/libs/sync/sync.go index 54d0624e77..5c4c9d8f68 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -5,9 +5,9 @@ import ( "fmt" "time" + "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/git" "github.com/databricks/cli/libs/log" - "github.com/databricks/cli/libs/sync/repofiles" "github.com/databricks/databricks-sdk-go" ) @@ -29,9 +29,9 @@ type SyncOptions struct { type Sync struct { *SyncOptions - fileSet *git.FileSet - snapshot *Snapshot - repoFiles *repofiles.RepoFiles + fileSet *git.FileSet + snapshot *Snapshot + filer filer.Filer // Synchronization progress events are sent to this event notifier. notifier EventNotifier @@ -77,16 +77,19 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { } } - repoFiles := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient) + filer, err := filer.NewWorkspaceFilesClient(opts.WorkspaceClient, opts.RemotePath) + if err != nil { + return nil, err + } return &Sync{ SyncOptions: &opts, - fileSet: fileSet, - snapshot: snapshot, - repoFiles: repoFiles, - notifier: &NopNotifier{}, - seq: 0, + fileSet: fileSet, + snapshot: snapshot, + filer: filer, + notifier: &NopNotifier{}, + seq: 0, }, nil } diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 3e7acccc2c..b0c96e01c0 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -2,36 +2,81 @@ package sync import ( "context" + "errors" + "io/fs" + "os" + "path/filepath" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/log" "golang.org/x/sync/errgroup" ) // Maximum number of concurrent requests during sync. const MaxRequestsInFlight = 20 -// Perform a DELETE of the specified remote path. -func (s *Sync) applyDelete(ctx context.Context, group *errgroup.Group, remoteName string) { - // Return early if the context has already been cancelled. - select { - case <-ctx.Done(): - return - default: - // Proceed. +// Delete the specified path. +func (s *Sync) applyDelete(ctx context.Context, remoteName string) error { + s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) + + err := s.filer.Delete(ctx, remoteName) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return err } - group.Go(func() error { - s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) - err := s.repoFiles.DeleteFile(ctx, remoteName) - if err != nil { - return err - } - s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) - return nil - }) + s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) + return nil +} + +// Remove the directory at the specified path. +func (s *Sync) applyRmdir(ctx context.Context, remoteName string) error { + s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) + + err := s.filer.Delete(ctx, remoteName) + if err != nil { + // Directory deletion is opportunistic, so we ignore errors. + log.Debugf(ctx, "error removing directory %s: %s", remoteName, err) + } + + s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) + return nil +} + +// Create a directory at the specified path. +func (s *Sync) applyMkdir(ctx context.Context, localName string) error { + s.notifyProgress(ctx, EventActionPut, localName, 0.0) + + err := s.filer.Mkdir(ctx, localName) + if err != nil { + return err + } + + s.notifyProgress(ctx, EventActionPut, localName, 1.0) + return nil } // Perform a PUT of the specified local path. -func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName string) { +func (s *Sync) applyPut(ctx context.Context, localName string) error { + s.notifyProgress(ctx, EventActionPut, localName, 0.0) + + localFile, err := os.Open(filepath.Join(s.LocalPath, localName)) + if err != nil { + return err + } + + defer localFile.Close() + + opts := []filer.WriteMode{filer.CreateParentDirectories, filer.OverwriteIfExists} + err = s.filer.Write(ctx, localName, localFile, opts...) + if err != nil { + return err + } + + s.notifyProgress(ctx, EventActionPut, localName, 1.0) + return nil +} + +func groupRunSingle(ctx context.Context, group *errgroup.Group, fn func(context.Context, string) error, path string) { // Return early if the context has already been cancelled. select { case <-ctx.Done(): @@ -41,28 +86,49 @@ func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName st } group.Go(func() error { - s.notifyProgress(ctx, EventActionPut, localName, 0.0) - err := s.repoFiles.PutFile(ctx, localName) - if err != nil { - return err - } - s.notifyProgress(ctx, EventActionPut, localName, 1.0) - return nil + return fn(ctx, path) }) } -func (s *Sync) applyDiff(ctx context.Context, d diff) error { +func groupRunParallel(ctx context.Context, paths []string, fn func(context.Context, string) error) error { group, ctx := errgroup.WithContext(ctx) group.SetLimit(MaxRequestsInFlight) - for _, remoteName := range d.delete { - s.applyDelete(ctx, group, remoteName) - } - - for _, localName := range d.put { - s.applyPut(ctx, group, localName) + for _, path := range paths { + groupRunSingle(ctx, group, fn, path) } // Wait for goroutines to finish and return first non-nil error return if any. return group.Wait() } + +func (s *Sync) applyDiff(ctx context.Context, d diff) error { + var err error + + // Delete files in parallel. + err = groupRunParallel(ctx, d.delete, s.applyDelete) + if err != nil { + return err + } + + // Delete directories ordered by depth from leaf to root. + for _, group := range d.groupedRmdir() { + err = groupRunParallel(ctx, group, s.applyRmdir) + if err != nil { + return err + } + } + + // Create directories (leafs only because intermediates are created automatically). + for _, group := range d.groupedMkdir() { + err = groupRunParallel(ctx, group, s.applyMkdir) + if err != nil { + return err + } + } + + // Put files in parallel. + err = groupRunParallel(ctx, d.put, s.applyPut) + + return err +}