Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added bundle generate pipeline command #1139

Merged
merged 5 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions bundle/config/generate/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package generate

import (
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/yamlsaver"
"github.com/databricks/databricks-sdk-go/service/pipelines"
)

var pipelineOrder = yamlsaver.NewOrder([]string{"name", "clusters", "configuration", "libraries"})

func ConvertPipelineToValue(pipeline *pipelines.PipelineSpec) (dyn.Value, error) {
value := make(map[string]dyn.Value)

// We ignore the following fields:
// - id: this is a read-only field
// - storage: changes to this field are rare because changing the storage recreates pipeline-related resources
// - edition: this field is rarely changed
return yamlsaver.ConvertToMapValue(pipeline, pipelineOrder, []string{"id", "storage", "edition"}, value)
}
1 change: 1 addition & 0 deletions cmd/bundle/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ func newGenerateCommand() *cobra.Command {
}

cmd.AddCommand(generate.NewGenerateJobCommand())
cmd.AddCommand(generate.NewGeneratePipelineCommand())
return cmd
}
98 changes: 98 additions & 0 deletions cmd/bundle/generate/generate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package generate

import (
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/databricks-sdk-go/experimental/mocks"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestGeneratePipelineCommand(t *testing.T) {
cmd := NewGeneratePipelineCommand()

root := t.TempDir()
b := &bundle.Bundle{
Config: config.Root{
Path: root,
},
}

m := mocks.NewMockWorkspaceClient(t)
b.SetWorkpaceClient(m.WorkspaceClient)
pipelineApi := m.GetMockPipelinesAPI()
pipelineApi.EXPECT().Get(mock.Anything, pipelines.GetPipelineRequest{PipelineId: "test-pipeline"}).Return(&pipelines.GetPipelineResponse{
PipelineId: "test-pipeline",
Name: "test-pipeline",
Spec: &pipelines.PipelineSpec{
Name: "test-pipeline",
Libraries: []pipelines.PipelineLibrary{
{Notebook: &pipelines.NotebookLibrary{
Path: "/test/notebook",
}},
{File: &pipelines.FileLibrary{
Path: "/test/file.py",
}},
},
},
}, nil)

workspaceApi := m.GetMockWorkspaceAPI()
workspaceApi.EXPECT().GetStatusByPath(mock.Anything, "/test/notebook").Return(&workspace.ObjectInfo{
ObjectType: workspace.ObjectTypeNotebook,
Language: workspace.LanguagePython,
Path: "/test/notebook",
}, nil)

workspaceApi.EXPECT().GetStatusByPath(mock.Anything, "/test/file.py").Return(&workspace.ObjectInfo{
ObjectType: workspace.ObjectTypeFile,
Path: "/test/file.py",
}, nil)

notebookContent := io.NopCloser(bytes.NewBufferString("# Databricks notebook source\nNotebook content"))
pyContent := io.NopCloser(bytes.NewBufferString("Py content"))
workspaceApi.EXPECT().Download(mock.Anything, "/test/notebook", mock.Anything).Return(notebookContent, nil)
workspaceApi.EXPECT().Download(mock.Anything, "/test/file.py", mock.Anything).Return(pyContent, nil)

cmd.SetContext(bundle.Context(context.Background(), b))
cmd.Flag("existing-pipeline-id").Value.Set("test-pipeline")

configDir := filepath.Join(root, "resources")
cmd.Flag("config-dir").Value.Set(configDir)

srcDir := filepath.Join(root, "src")
cmd.Flag("source-dir").Value.Set(srcDir)
err := cmd.RunE(cmd, []string{})
require.NoError(t, err)

data, err := os.ReadFile(filepath.Join(configDir, "pipeline_test_pipeline.yml"))
require.NoError(t, err)
require.Equal(t, fmt.Sprintf(`resources:
pipelines:
pipeline_test_pipeline:
name: test-pipeline
libraries:
- notebook:
path: %s
- file:
path: %s
`, filepath.Join("..", "src", "notebook.py"), filepath.Join("..", "src", "file.py")), string(data))

data, err = os.ReadFile(filepath.Join(srcDir, "notebook.py"))
require.NoError(t, err)
require.Equal(t, "# Databricks notebook source\nNotebook content", string(data))

data, err = os.ReadFile(filepath.Join(srcDir, "file.py"))
require.NoError(t, err)
require.Equal(t, "Py content", string(data))
}
4 changes: 2 additions & 2 deletions cmd/bundle/generate/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func NewGenerateJobCommand() *cobra.Command {
return err
}

downloader := newNotebookDownloader(w, sourceDir, configDir)
downloader := newDownloader(w, sourceDir, configDir)
for _, task := range job.Settings.Tasks {
err := downloader.MarkForDownload(ctx, &task)
err := downloader.MarkTaskForDownload(ctx, &task)
if err != nil {
return err
}
Expand Down
91 changes: 91 additions & 0 deletions cmd/bundle/generate/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package generate

import (
"fmt"
"os"
"path/filepath"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/generate"
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/yamlsaver"
"github.com/databricks/cli/libs/textutil"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/spf13/cobra"
)

func NewGeneratePipelineCommand() *cobra.Command {
var configDir string
var sourceDir string
var pipelineId string
var force bool

cmd := &cobra.Command{
Use: "pipeline",
Short: "Generate bundle configuration for a pipeline",
PreRunE: root.MustConfigureBundle,
}

cmd.Flags().StringVar(&pipelineId, "existing-pipeline-id", "", `ID of the pipeline to generate config for`)
cmd.MarkFlagRequired("existing-pipeline-id")

wd, err := os.Getwd()
if err != nil {
wd = "."
}

cmd.Flags().StringVarP(&configDir, "config-dir", "d", filepath.Join(wd, "resources"), `Dir path where the output config will be stored`)
cmd.Flags().StringVarP(&sourceDir, "source-dir", "s", filepath.Join(wd, "src"), `Dir path where the downloaded files will be stored`)
cmd.Flags().BoolVarP(&force, "force", "f", false, `Force overwrite existing files in the output directory`)

cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
b := bundle.Get(ctx)
w := b.WorkspaceClient()

pipeline, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{PipelineId: pipelineId})
if err != nil {
return err
}

downloader := newDownloader(w, sourceDir, configDir)
for _, lib := range pipeline.Spec.Libraries {
err := downloader.MarkPipelineLibraryForDownload(ctx, &lib)
if err != nil {
return err
}
}

v, err := generate.ConvertPipelineToValue(pipeline.Spec)
if err != nil {
return err
}

jobKey := fmt.Sprintf("pipeline_%s", textutil.NormalizeString(pipeline.Name))
result := map[string]dyn.Value{
"resources": dyn.V(map[string]dyn.Value{
"pipelines": dyn.V(map[string]dyn.Value{
jobKey: v,
}),
}),
}

err = downloader.FlushToDisk(ctx, force)
if err != nil {
return err
}

filename := filepath.Join(configDir, fmt.Sprintf("%s.yml", jobKey))
err = yamlsaver.SaveAsYAML(result, filename, force)
if err != nil {
return err
}

cmdio.LogString(ctx, fmt.Sprintf("Pipeline configuration successfully saved to %s", filename))
return nil
}

return cmd
}
69 changes: 53 additions & 16 deletions cmd/bundle/generate/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,88 @@ import (
"github.com/databricks/cli/libs/notebook"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"golang.org/x/sync/errgroup"
)

type notebookDownloader struct {
notebooks map[string]string
type downloader struct {
files map[string]string
w *databricks.WorkspaceClient
sourceDir string
configDir string
}

func (n *notebookDownloader) MarkForDownload(ctx context.Context, task *jobs.Task) error {
func (n *downloader) MarkTaskForDownload(ctx context.Context, task *jobs.Task) error {
if task.NotebookTask == nil {
return nil
}

info, err := n.w.Workspace.GetStatusByPath(ctx, task.NotebookTask.NotebookPath)
return n.markNotebookForDownload(ctx, &task.NotebookTask.NotebookPath)
}

func (n *downloader) MarkPipelineLibraryForDownload(ctx context.Context, lib *pipelines.PipelineLibrary) error {
if lib.Notebook != nil {
return n.markNotebookForDownload(ctx, &lib.Notebook.Path)
}

if lib.File != nil {
return n.markFileForDownload(ctx, &lib.File.Path)
}

return nil
}

func (n *downloader) markFileForDownload(ctx context.Context, filePath *string) error {
_, err := n.w.Workspace.GetStatusByPath(ctx, *filePath)
if err != nil {
return err
}

filename := path.Base(*filePath)
targetPath := filepath.Join(n.sourceDir, filename)

n.files[targetPath] = *filePath

rel, err := filepath.Rel(n.configDir, targetPath)
if err != nil {
return err
}

*filePath = rel
return nil
}

func (n *downloader) markNotebookForDownload(ctx context.Context, notebookPath *string) error {
info, err := n.w.Workspace.GetStatusByPath(ctx, *notebookPath)
if err != nil {
return err
}

ext := notebook.GetExtensionByLanguage(info)

filename := path.Base(task.NotebookTask.NotebookPath) + ext
filename := path.Base(*notebookPath) + ext
targetPath := filepath.Join(n.sourceDir, filename)

n.notebooks[targetPath] = task.NotebookTask.NotebookPath
n.files[targetPath] = *notebookPath

// Update the notebook path to be relative to the config dir
rel, err := filepath.Rel(n.configDir, targetPath)
if err != nil {
return err
}

task.NotebookTask.NotebookPath = rel
*notebookPath = rel
return nil
}

func (n *notebookDownloader) FlushToDisk(ctx context.Context, force bool) error {
func (n *downloader) FlushToDisk(ctx context.Context, force bool) error {
err := os.MkdirAll(n.sourceDir, 0755)
if err != nil {
return err
}

// First check that all files can be written
for targetPath := range n.notebooks {
for targetPath := range n.files {
info, err := os.Stat(targetPath)
if err == nil {
if info.IsDir() {
Expand All @@ -69,11 +106,11 @@ func (n *notebookDownloader) FlushToDisk(ctx context.Context, force bool) error
}

errs, errCtx := errgroup.WithContext(ctx)
for k, v := range n.notebooks {
for k, v := range n.files {
targetPath := k
notebookPath := v
filePath := v
errs.Go(func() error {
reader, err := n.w.Workspace.Download(errCtx, notebookPath)
reader, err := n.w.Workspace.Download(errCtx, filePath)
andrewnester marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
Expand All @@ -89,17 +126,17 @@ func (n *notebookDownloader) FlushToDisk(ctx context.Context, force bool) error
return err
}

cmdio.LogString(errCtx, fmt.Sprintf("Notebook successfully saved to %s", targetPath))
cmdio.LogString(errCtx, fmt.Sprintf("File successfully saved to %s", targetPath))
return reader.Close()
})
}

return errs.Wait()
}

func newNotebookDownloader(w *databricks.WorkspaceClient, sourceDir string, configDir string) *notebookDownloader {
return &notebookDownloader{
notebooks: make(map[string]string),
func newDownloader(w *databricks.WorkspaceClient, sourceDir string, configDir string) *downloader {
return &downloader{
files: make(map[string]string),
w: w,
sourceDir: sourceDir,
configDir: configDir,
Expand Down
Loading
Loading