-
Notifications
You must be signed in to change notification settings - Fork 71
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add filer.Filer implementation backed by the Files API (#474)
## Tests New integration test for the read/write parts of the other filers. The integration test cannot be shared just yet because the Files API doesn't include support for creating/listing/removing directories yet.
- Loading branch information
Showing
3 changed files
with
368 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,241 @@ | ||
package filer | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"io/fs" | ||
"net/http" | ||
"net/url" | ||
"path" | ||
"strings" | ||
"time" | ||
|
||
"github.com/databricks/databricks-sdk-go" | ||
"github.com/databricks/databricks-sdk-go/apierr" | ||
"github.com/databricks/databricks-sdk-go/client" | ||
"golang.org/x/exp/slices" | ||
) | ||
|
||
// Type that implements fs.FileInfo for the Files API. | ||
type filesApiFileInfo struct { | ||
absPath string | ||
isDir bool | ||
} | ||
|
||
func (info filesApiFileInfo) Name() string { | ||
return path.Base(info.absPath) | ||
} | ||
|
||
func (info filesApiFileInfo) Size() int64 { | ||
// No way to get the file size in the Files API. | ||
return 0 | ||
} | ||
|
||
func (info filesApiFileInfo) Mode() fs.FileMode { | ||
mode := fs.ModePerm | ||
if info.isDir { | ||
mode |= fs.ModeDir | ||
} | ||
return mode | ||
} | ||
|
||
func (info filesApiFileInfo) ModTime() time.Time { | ||
return time.Time{} | ||
} | ||
|
||
func (info filesApiFileInfo) IsDir() bool { | ||
return info.isDir | ||
} | ||
|
||
func (info filesApiFileInfo) Sys() any { | ||
return nil | ||
} | ||
|
||
// FilesClient implements the [Filer] interface for the Files API backend. | ||
type FilesClient struct { | ||
workspaceClient *databricks.WorkspaceClient | ||
apiClient *client.DatabricksClient | ||
|
||
// File operations will be relative to this path. | ||
root RootPath | ||
} | ||
|
||
func filesNotImplementedError(fn string) error { | ||
return fmt.Errorf("filer.%s is not implemented for the Files API", fn) | ||
} | ||
|
||
func NewFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) { | ||
apiClient, err := client.New(w.Config) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &FilesClient{ | ||
workspaceClient: w, | ||
apiClient: apiClient, | ||
|
||
root: NewRootPath(root), | ||
}, nil | ||
} | ||
|
||
func (w *FilesClient) urlPath(name string) (string, string, error) { | ||
absPath, err := w.root.Join(name) | ||
if err != nil { | ||
return "", "", err | ||
} | ||
|
||
// The user specified part of the path must be escaped. | ||
urlPath := fmt.Sprintf( | ||
"/api/2.0/fs/files/%s", | ||
url.PathEscape(strings.TrimLeft(absPath, "/")), | ||
) | ||
|
||
return absPath, urlPath, nil | ||
} | ||
|
||
func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { | ||
absPath, urlPath, err := w.urlPath(name) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
overwrite := slices.Contains(mode, OverwriteIfExists) | ||
urlPath = fmt.Sprintf("%s?overwrite=%t", urlPath, overwrite) | ||
err = w.apiClient.Do(ctx, http.MethodPut, urlPath, reader, nil, | ||
func(r *http.Request) error { | ||
r.Header.Set("Content-Type", "application/octet-stream") | ||
return nil | ||
}) | ||
|
||
// Return early on success. | ||
if err == nil { | ||
return nil | ||
} | ||
|
||
// Special handling of this error only if it is an API error. | ||
var aerr *apierr.APIError | ||
if !errors.As(err, &aerr) { | ||
return err | ||
} | ||
|
||
// This API returns 409 if the file already exists, when the object type is file | ||
if aerr.StatusCode == http.StatusConflict { | ||
return FileAlreadyExistsError{absPath} | ||
} | ||
|
||
return err | ||
} | ||
|
||
func (w *FilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) { | ||
absPath, urlPath, err := w.urlPath(name) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var buf bytes.Buffer | ||
err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, &buf) | ||
|
||
// Return early on success. | ||
if err == nil { | ||
return io.NopCloser(&buf), nil | ||
} | ||
|
||
// Special handling of this error only if it is an API error. | ||
var aerr *apierr.APIError | ||
if !errors.As(err, &aerr) { | ||
return nil, err | ||
} | ||
|
||
// This API returns a 404 if the specified path does not exist. | ||
if aerr.StatusCode == http.StatusNotFound { | ||
return nil, FileDoesNotExistError{absPath} | ||
} | ||
|
||
return nil, err | ||
} | ||
|
||
func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { | ||
absPath, urlPath, err := w.urlPath(name) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Illegal to delete the root path. | ||
if absPath == w.root.rootPath { | ||
return CannotDeleteRootError{} | ||
} | ||
|
||
err = w.apiClient.Do(ctx, http.MethodDelete, urlPath, nil, nil) | ||
|
||
// Return early on success. | ||
if err == nil { | ||
return nil | ||
} | ||
|
||
// Special handling of this error only if it is an API error. | ||
var aerr *apierr.APIError | ||
if !errors.As(err, &aerr) { | ||
return err | ||
} | ||
|
||
// This API returns a 404 if the specified path does not exist. | ||
if aerr.StatusCode == http.StatusNotFound { | ||
return FileDoesNotExistError{absPath} | ||
} | ||
|
||
// This API returns 409 if the underlying path is a directory. | ||
if aerr.StatusCode == http.StatusConflict { | ||
return DirectoryNotEmptyError{absPath} | ||
} | ||
|
||
return err | ||
} | ||
|
||
func (w *FilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { | ||
return nil, filesNotImplementedError("ReadDir") | ||
} | ||
|
||
func (w *FilesClient) Mkdir(ctx context.Context, name string) error { | ||
// Directories are created implicitly. | ||
// No need to do anything. | ||
return nil | ||
} | ||
|
||
func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) { | ||
absPath, urlPath, err := w.urlPath(name) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
err = w.apiClient.Do(ctx, http.MethodHead, urlPath, nil, nil, | ||
func(r *http.Request) error { | ||
r.Header.Del("Content-Type") | ||
return nil | ||
}) | ||
|
||
// If the HEAD requests succeeds, the file exists. | ||
if err == nil { | ||
return filesApiFileInfo{absPath: absPath, isDir: false}, nil | ||
} | ||
|
||
// Special handling of this error only if it is an API error. | ||
var aerr *apierr.APIError | ||
if !errors.As(err, &aerr) { | ||
return nil, err | ||
} | ||
|
||
// This API returns a 404 if the specified path does not exist. | ||
if aerr.StatusCode == http.StatusNotFound { | ||
return nil, FileDoesNotExistError{absPath} | ||
} | ||
|
||
// This API returns 409 if the underlying path is a directory. | ||
if aerr.StatusCode == http.StatusConflict { | ||
return filesApiFileInfo{absPath: absPath, isDir: true}, nil | ||
} | ||
|
||
return nil, err | ||
} |