From 77cf2dc2bcffc1ead81587c7b66226333a927d00 Mon Sep 17 00:00:00 2001 From: Chief-Rishab Date: Tue, 31 Jan 2023 12:37:37 +0530 Subject: [PATCH 1/6] GCS: add sink, client and documentation --- plugins/sinks/gcs/README.md | 41 +++++++++ plugins/sinks/gcs/client.go | 49 +++++++++++ plugins/sinks/gcs/gcs.go | 161 ++++++++++++++++++++++++++++++++++++ plugins/sinks/populate.go | 1 + 4 files changed, 252 insertions(+) create mode 100644 plugins/sinks/gcs/README.md create mode 100644 plugins/sinks/gcs/client.go create mode 100644 plugins/sinks/gcs/gcs.go diff --git a/plugins/sinks/gcs/README.md b/plugins/sinks/gcs/README.md new file mode 100644 index 000000000..5a18f1f49 --- /dev/null +++ b/plugins/sinks/gcs/README.md @@ -0,0 +1,41 @@ +# GCS + +Sinks json data to a file in `ndjson` format in a Google Cloud Storage bucket + +## Usage +```yaml +sinks: + - name: gcs + config: + project_id: google-project-id + path: bucket_name/target_folder + object_prefix : github-users + service_account_base64: + service_account_json: + { + "type": "service_account", + "private_key_id": "xxxxxxx", + "private_key": "xxxxxxx", + "client_email": "xxxxxxx", + "client_id": "xxxxxxx", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "xxxxxxx", + "client_x509_cert_url": "xxxxxxx", + } +``` + +## Config Definition + +| Key | Value | Example | Description | | +| :-- | :---- | :------ | :---------- | :-- | +|`project_id` | `string` | `google-project-id` | Google Cloud Storage Project ID | *required*| +| `path` | `string` | `bucket_name/target_folder` | the path of the folder where new object is to be put | *required* | +| `object_prefix` | `string` | `github-users` | the .ndjson file name prefix where json data will be inserted with timestamp Note: If prefix is not provided, the output data will be put in a `timestamp.ndjson` file in the provided path. Otherwise in the given example the output file will be `github-users-timestamp.ndjson`| *optional* | +| `service_account_base64` | `string` | `ewog....fQo=` | Service Account Key in base64 encoded string. Takes precedence over `service_account_json` value | *optional* | +| `service_account_json` | `string` | `{"private_key": .., "private_id": ...}` | Service Account Key in JSON string | *optional* | + + +## Contributing + +Refer to the [contribution guidelines](../../../docs/docs/contribute/guide.md#adding-a-new-extractor) for information on contributing to this module. diff --git a/plugins/sinks/gcs/client.go b/plugins/sinks/gcs/client.go new file mode 100644 index 000000000..14f427768 --- /dev/null +++ b/plugins/sinks/gcs/client.go @@ -0,0 +1,49 @@ +package gcs + +import ( + "context" + + "cloud.google.com/go/storage" + "github.com/pkg/errors" + "google.golang.org/api/option" +) + +type GCSClient interface { + WriteData([]byte) error + Close() error +} + +type gcsClient struct { + client *storage.Client + writer *storage.Writer +} + +func newGCSClient(ctx context.Context, serviceAccountJSON []byte, bucketname string, filepath string) (GCSClient, error) { + client, err := storage.NewClient(ctx, option.WithCredentialsJSON(serviceAccountJSON)) + if err != nil { + return nil, errors.Wrap(err, "error in creating client") + } + + writer := client.Bucket(bucketname).Object(filepath).NewWriter(ctx) + + return &gcsClient{ + client: client, + writer: writer, + }, nil +} + +func (c *gcsClient) WriteData(jsonBytes []byte) error { + if _, err := c.writer.Write(jsonBytes); err != nil { + return errors.Wrap(err, "error in writing json data to an object") + } + + return nil +} + +func (c *gcsClient) Close() error { + if err := c.writer.Close(); err != nil { + return errors.Wrap(err, "error closing the writer") + } + + return nil +} diff --git a/plugins/sinks/gcs/gcs.go b/plugins/sinks/gcs/gcs.go new file mode 100644 index 000000000..783829dc9 --- /dev/null +++ b/plugins/sinks/gcs/gcs.go @@ -0,0 +1,161 @@ +package gcs + +import ( + "context" + _ "embed" + "encoding/base64" + "strings" + "time" + + "github.com/MakeNowJust/heredoc" + "github.com/odpf/meteor/models" + assetsv1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/registry" + "github.com/odpf/salt/log" + "github.com/pkg/errors" +) + +//go:embed README.md +var summary string + +type Config struct { + ProjectID string `mapstructure:"project_id" validate:"required"` + Path string `mapstructure:"path" validate:"required"` + ObjectPrefix string `mapstructure:"object_prefix"` + ServiceAccountJSON string `mapstructure:"service_account_json"` + ServiceAccountBase64 string `mapstructure:"service_account_base64"` +} + +var info = plugins.Info{ + Description: "saves data in google cloud storage bucket", + Summary: summary, + SampleConfig: heredoc.Doc(` + project_id: google-project-id + path: bucket_name/target_folder + object_prefix : github-users + service_account_base64: ____base64_encoded_service_account____ + service_account_json: |- + { + "type": "service_account", + "private_key_id": "xxxxxxx", + "private_key": "xxxxxxx", + "client_email": "xxxxxxx", + "client_id": "xxxxxxx", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "xxxxxxx", + "client_x509_cert_url": "xxxxxxx" + } + `), + Tags: []string{"gcs", "sink"}, +} + +type Sink struct { + plugins.BasePlugin + logger log.Logger + client GCSClient + config Config +} + +func New(logger log.Logger) plugins.Syncer { + s := &Sink{ + logger: logger, + } + s.BasePlugin = plugins.NewBasePlugin(info, &s.config) + + return s +} + +func (s *Sink) Init(ctx context.Context, config plugins.Config) (err error) { + if err = s.BasePlugin.Init(ctx, config); err != nil { + return err + } + + if err := s.validateServiceAccountKey(); err != nil { + return err + } + + bucketname, objectname := s.resolveBucketandObjectNames() + + if s.client, err = newGCSClient(ctx, []byte(s.config.ServiceAccountJSON), bucketname, objectname); err != nil { + return err + } + + return nil +} + +func (s *Sink) validateServiceAccountKey() error { + + if s.config.ServiceAccountBase64 == "" && s.config.ServiceAccountJSON == "" { + return errors.New("credentials are not specified, failed to create client") + } + + if s.config.ServiceAccountBase64 != "" { + serviceAccountJSON, err := base64.StdEncoding.DecodeString(s.config.ServiceAccountBase64) + if err != nil || len(serviceAccountJSON) == 0 { + return errors.Wrap(err, "failed to decode base64 service account") + } + s.config.ServiceAccountJSON = string(serviceAccountJSON) + } + return nil +} + +func (s *Sink) resolveBucketandObjectNames() (string, string) { + dirs := strings.Split(s.config.Path, "/") + bucketname := dirs[0] + timestamp := time.Now().Format("2006.01.02 15:04:05") + + if s.config.ObjectPrefix != "" { + s.config.ObjectPrefix = s.config.ObjectPrefix + "-" + } + + objectname := s.config.ObjectPrefix + timestamp + ".ndjson" + if len(dirs) > 1 { + objectname = dirs[len(dirs)-1] + "/" + s.config.ObjectPrefix + timestamp + ".ndjson" + } + + return bucketname, objectname +} + +func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { + data := make([]*assetsv1beta2.Asset, 0, len(batch)) + + for _, record := range batch { + data = append(data, record.Data()) + } + if err = s.writeData(data); err != nil { + return errors.Wrap(err, "error in writing data to the object") + } + return nil +} + +func (s *Sink) writeData(data []*assetsv1beta2.Asset) (err error) { + for _, asset := range data { + jsonBytes, _ := models.ToJSON(asset) + + if err := s.client.WriteData(jsonBytes); err != nil { + return err + } + + if err := s.client.WriteData([]byte("\n")); err != nil { + return err + } + } + return nil +} + +func (s *Sink) Close() (err error) { + if err := s.client.Close(); err != nil { + return err + } + return nil +} + +func init() { + if err := registry.Sinks.Register("gcs", func() plugins.Syncer { + return New(plugins.GetLog()) + }); err != nil { + panic(err) + } +} diff --git a/plugins/sinks/populate.go b/plugins/sinks/populate.go index 8906e5796..0f309915c 100644 --- a/plugins/sinks/populate.go +++ b/plugins/sinks/populate.go @@ -4,6 +4,7 @@ import ( _ "github.com/odpf/meteor/plugins/sinks/compass" _ "github.com/odpf/meteor/plugins/sinks/console" _ "github.com/odpf/meteor/plugins/sinks/file" + _ "github.com/odpf/meteor/plugins/sinks/gcs" _ "github.com/odpf/meteor/plugins/sinks/http" _ "github.com/odpf/meteor/plugins/sinks/kafka" _ "github.com/odpf/meteor/plugins/sinks/shield" From 0021f3ba96bbc0d709d4dbf4bcde33f80f8012ab Mon Sep 17 00:00:00 2001 From: Chief-Rishab Date: Tue, 31 Jan 2023 13:19:38 +0530 Subject: [PATCH 2/6] GCS: add tests --- plugins/sinks/gcs/gcs_test.go | 112 ++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 plugins/sinks/gcs/gcs_test.go diff --git a/plugins/sinks/gcs/gcs_test.go b/plugins/sinks/gcs/gcs_test.go new file mode 100644 index 000000000..552b63420 --- /dev/null +++ b/plugins/sinks/gcs/gcs_test.go @@ -0,0 +1,112 @@ +//go:build plugins +// +build plugins +package gcs_test + +import ( + "context" + "testing" + + "github.com/odpf/meteor/models" + v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + "github.com/odpf/meteor/plugins" + g "github.com/odpf/meteor/plugins/sinks/gcs" + testUtils "github.com/odpf/meteor/test/utils" + "github.com/odpf/meteor/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "google.golang.org/protobuf/types/known/anypb" +) + +var validConfig = map[string]interface{}{ + "project_id": "google-project-id", + "path": "bucket_name/target_folder", + "object_prefix": "github-users", + "service_account_base64": "base 64 encoded key", +} + +func TestInit(t *testing.T) { + + t.Run("should return error if config is invalid", func(t *testing.T) { + gcsSink := g.New(testUtils.Logger) + actualError := gcsSink.Init(context.TODO(), plugins.Config{RawConfig: map[string]interface{}{ + "project_id": "", + }}) + assert.ErrorAs(t, actualError, &plugins.InvalidConfigError{}) + }) + + t.Run("should retun error if service account json and service account base64 missing", func(t *testing.T) { + + gcsSink := g.New(testUtils.Logger) + actualError := gcsSink.Init(context.TODO(), plugins.Config{RawConfig: map[string]interface{}{ + "project_id": "google-project-id", + "path": "bucket_name/target_folder", + }}) + assert.ErrorContains(t, actualError, "credentials are not specified, failed to create client") + }) + + t.Run("should retun error if unable to decode base64 service account key", func(t *testing.T) { + + gcsSink := g.New(testUtils.Logger) + actualError := gcsSink.Init(context.TODO(), plugins.Config{RawConfig: map[string]interface{}{ + "project_id": "google-project-id", + "path": "bucket_name/target_folder", + "service_account_base64": "----", // invalid + }}) + assert.ErrorContains(t, actualError, "failed to decode base64 service account") + }) +} + +func TestSink(t *testing.T) { + + t.Run("should write data in bucket and return nil error on success", func(t *testing.T) { + u := &v1beta2.User{ + FullName: "John Doe", + Email: "john.doe@odpf.com", + Attributes: utils.TryParseMapToProto(map[string]interface{}{ + "org_unit_path": "/", + "aliases": "doe.john@odpf.com,johndoe@odpf.com", + }), + } + user, _ := anypb.New(u) + data := &v1beta2.Asset{ + Data: user, + } + jsonBytes, _ := models.ToJSON(data) + + ctx := context.TODO() + client := new(mockClient) + client.On("WriteData", jsonBytes).Return(nil) + client.On("WriteData", []byte("\n")).Return(nil) + + gcsSink := g.New(testUtils.Logger) + + err := gcsSink.Init(context.TODO(), plugins.Config{RawConfig: map[string]interface{}{ + "project_id": "google-project-id", + "path": "bucket_name/target_folder", + "service_account_json": `{"type": "service_account"}`, + }}) + if err != nil { + t.Fatal(err) + } + + err = gcsSink.Sink(ctx, []models.Record{models.NewRecord(data)}) + + assert.NoError(t, err) + }) +} + +type mockClient struct { + mock.Mock +} + +func (m *mockClient) WriteData(jsonBytes []byte) error { + args := m.Called(jsonBytes) + + return args.Error(0) +} + +func (m *mockClient) Close() error { + args := m.Called() + + return args.Error(0) +} From 693528865741a510216cbd2efa3952c3cdd2a62a Mon Sep 17 00:00:00 2001 From: Chief-Rishab Date: Tue, 31 Jan 2023 13:23:24 +0530 Subject: [PATCH 3/6] fix: spacing --- plugins/sinks/gcs/gcs_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/sinks/gcs/gcs_test.go b/plugins/sinks/gcs/gcs_test.go index 552b63420..b7fdacdf7 100644 --- a/plugins/sinks/gcs/gcs_test.go +++ b/plugins/sinks/gcs/gcs_test.go @@ -1,5 +1,6 @@ //go:build plugins // +build plugins + package gcs_test import ( From be3cac66125654c69dd7578ccd3467d67c2eeefa Mon Sep 17 00:00:00 2001 From: Chief-Rishab Date: Tue, 31 Jan 2023 15:48:45 +0530 Subject: [PATCH 4/6] chore: apply changes from suggestions --- plugins/sinks/gcs/README.md | 2 +- plugins/sinks/gcs/client.go | 24 +++++++++--------------- plugins/sinks/gcs/gcs.go | 30 +++++++++++++++--------------- plugins/sinks/gcs/gcs_test.go | 12 ++++++------ 4 files changed, 31 insertions(+), 37 deletions(-) diff --git a/plugins/sinks/gcs/README.md b/plugins/sinks/gcs/README.md index 5a18f1f49..192d663dc 100644 --- a/plugins/sinks/gcs/README.md +++ b/plugins/sinks/gcs/README.md @@ -1,6 +1,6 @@ # GCS -Sinks json data to a file in `ndjson` format in a Google Cloud Storage bucket +Sinks json data to a file as ndjson format in Google Cloud Storage bucket ## Usage ```yaml diff --git a/plugins/sinks/gcs/client.go b/plugins/sinks/gcs/client.go index 14f427768..177428aff 100644 --- a/plugins/sinks/gcs/client.go +++ b/plugins/sinks/gcs/client.go @@ -8,17 +8,16 @@ import ( "google.golang.org/api/option" ) -type GCSClient interface { +type Writer interface { WriteData([]byte) error Close() error } -type gcsClient struct { - client *storage.Client +type gcsWriter struct { writer *storage.Writer } -func newGCSClient(ctx context.Context, serviceAccountJSON []byte, bucketname string, filepath string) (GCSClient, error) { +func newWriter(ctx context.Context, serviceAccountJSON []byte, bucketname string, filepath string) (*gcsWriter, error) { client, err := storage.NewClient(ctx, option.WithCredentialsJSON(serviceAccountJSON)) if err != nil { return nil, errors.Wrap(err, "error in creating client") @@ -26,24 +25,19 @@ func newGCSClient(ctx context.Context, serviceAccountJSON []byte, bucketname str writer := client.Bucket(bucketname).Object(filepath).NewWriter(ctx) - return &gcsClient{ - client: client, + return &gcsWriter{ writer: writer, }, nil } -func (c *gcsClient) WriteData(jsonBytes []byte) error { - if _, err := c.writer.Write(jsonBytes); err != nil { - return errors.Wrap(err, "error in writing json data to an object") +func (c *gcsWriter) WriteData(data []byte) error { + if _, err := c.writer.Write(data); err != nil { + return errors.Wrap(err, "error in writing data to an object") } return nil } -func (c *gcsClient) Close() error { - if err := c.writer.Close(); err != nil { - return errors.Wrap(err, "error closing the writer") - } - - return nil +func (c *gcsWriter) Close() error { + return c.writer.Close() } diff --git a/plugins/sinks/gcs/gcs.go b/plugins/sinks/gcs/gcs.go index 783829dc9..483b356c8 100644 --- a/plugins/sinks/gcs/gcs.go +++ b/plugins/sinks/gcs/gcs.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "strings" "time" + "fmt" "github.com/MakeNowJust/heredoc" "github.com/odpf/meteor/models" @@ -54,7 +55,7 @@ var info = plugins.Info{ type Sink struct { plugins.BasePlugin logger log.Logger - client GCSClient + writer Writer config Config } @@ -78,7 +79,7 @@ func (s *Sink) Init(ctx context.Context, config plugins.Config) (err error) { bucketname, objectname := s.resolveBucketandObjectNames() - if s.client, err = newGCSClient(ctx, []byte(s.config.ServiceAccountJSON), bucketname, objectname); err != nil { + if s.writer, err = newWriter(ctx, []byte(s.config.ServiceAccountJSON), bucketname, objectname); err != nil { return err } @@ -86,7 +87,6 @@ func (s *Sink) Init(ctx context.Context, config plugins.Config) (err error) { } func (s *Sink) validateServiceAccountKey() error { - if s.config.ServiceAccountBase64 == "" && s.config.ServiceAccountJSON == "" { return errors.New("credentials are not specified, failed to create client") } @@ -104,15 +104,18 @@ func (s *Sink) validateServiceAccountKey() error { func (s *Sink) resolveBucketandObjectNames() (string, string) { dirs := strings.Split(s.config.Path, "/") bucketname := dirs[0] - timestamp := time.Now().Format("2006.01.02 15:04:05") + timestamp := time.Now().Format(time.RFC3339) - if s.config.ObjectPrefix != "" { - s.config.ObjectPrefix = s.config.ObjectPrefix + "-" + objectprefix := s.config.ObjectPrefix + + if objectprefix != "" && objectprefix[len(objectprefix)-1] !='-'{ + objectprefix = fmt.Sprintf("%s-", objectprefix) } - - objectname := s.config.ObjectPrefix + timestamp + ".ndjson" + + objectname := fmt.Sprintf("%s%s.ndjson", objectprefix, timestamp) + if len(dirs) > 1 { - objectname = dirs[len(dirs)-1] + "/" + s.config.ObjectPrefix + timestamp + ".ndjson" + objectname = fmt.Sprintf("%s/%s%s.ndjson", dirs[len(dirs)-1], objectprefix, timestamp) } return bucketname, objectname @@ -134,11 +137,11 @@ func (s *Sink) writeData(data []*assetsv1beta2.Asset) (err error) { for _, asset := range data { jsonBytes, _ := models.ToJSON(asset) - if err := s.client.WriteData(jsonBytes); err != nil { + if err := s.writer.WriteData(jsonBytes); err != nil { return err } - if err := s.client.WriteData([]byte("\n")); err != nil { + if err := s.writer.WriteData([]byte("\n")); err != nil { return err } } @@ -146,10 +149,7 @@ func (s *Sink) writeData(data []*assetsv1beta2.Asset) (err error) { } func (s *Sink) Close() (err error) { - if err := s.client.Close(); err != nil { - return err - } - return nil + return s.writer.Close() } func init() { diff --git a/plugins/sinks/gcs/gcs_test.go b/plugins/sinks/gcs/gcs_test.go index b7fdacdf7..03638f88b 100644 --- a/plugins/sinks/gcs/gcs_test.go +++ b/plugins/sinks/gcs/gcs_test.go @@ -75,9 +75,9 @@ func TestSink(t *testing.T) { jsonBytes, _ := models.ToJSON(data) ctx := context.TODO() - client := new(mockClient) - client.On("WriteData", jsonBytes).Return(nil) - client.On("WriteData", []byte("\n")).Return(nil) + writer := new(mockWriter) + writer.On("WriteData", jsonBytes).Return(nil) + writer.On("WriteData", []byte("\n")).Return(nil) gcsSink := g.New(testUtils.Logger) @@ -96,17 +96,17 @@ func TestSink(t *testing.T) { }) } -type mockClient struct { +type mockWriter struct { mock.Mock } -func (m *mockClient) WriteData(jsonBytes []byte) error { +func (m *mockWriter) WriteData(jsonBytes []byte) error { args := m.Called(jsonBytes) return args.Error(0) } -func (m *mockClient) Close() error { +func (m *mockWriter) Close() error { args := m.Called() return args.Error(0) From b2fa8193093ae77afc506ac5deec7eb812e66ea9 Mon Sep 17 00:00:00 2001 From: Chief-Rishab Date: Tue, 31 Jan 2023 16:47:13 +0530 Subject: [PATCH 5/6] docs: add gcs sink --- docs/docs/concepts/sink.md | 26 ++++++++++++++++++++++++++ docs/docs/reference/sinks.md | 27 +++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/docs/docs/concepts/sink.md b/docs/docs/concepts/sink.md index 0be8a7d94..772acf719 100644 --- a/docs/docs/concepts/sink.md +++ b/docs/docs/concepts/sink.md @@ -46,6 +46,32 @@ sinks: Sinks metadata to a file in `json/yaml` format as per the config defined. +* **Google Cloud Storage** + +```yaml +sinks: + - name: gcs + config: + project_id: google-project-id + path: bucket_name/target_folder + object_prefix : github-users + service_account_base64: + service_account_json: + { + "type": "service_account", + "private_key_id": "xxxxxxx", + "private_key": "xxxxxxx", + "client_email": "xxxxxxx", + "client_id": "xxxxxxx", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "xxxxxxx", + "client_x509_cert_url": "xxxxxxx", + } +``` + +Sinks json data to a file as ndjson format in Google Cloud Storage bucket + * **http** ```yaml diff --git a/docs/docs/reference/sinks.md b/docs/docs/reference/sinks.md index 827bd9da9..992f549ae 100644 --- a/docs/docs/reference/sinks.md +++ b/docs/docs/reference/sinks.md @@ -46,6 +46,33 @@ sinks: path: "./dir/sample.yaml" format: "yaml" ``` +## GCS + +`Google Cloud Storage` + +Sinks json data to a file as ndjson format in Google Cloud Storage bucket + +```yaml +sinks: + - name: gcs + config: + project_id: google-project-id + path: bucket_name/target_folder + object_prefix : github-users + service_account_base64: + service_account_json: + { + "type": "service_account", + "private_key_id": "xxxxxxx", + "private_key": "xxxxxxx", + "client_email": "xxxxxxx", + "client_id": "xxxxxxx", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "xxxxxxx", + "client_x509_cert_url": "xxxxxxx", + } +``` ## http From 8e764a02ec2390b486893e0af596f5de07fc6f5a Mon Sep 17 00:00:00 2001 From: Chief-Rishab Date: Mon, 6 Feb 2023 23:52:12 +0530 Subject: [PATCH 6/6] chore: apply changes from review --- docs/docs/concepts/sink.md | 2 +- docs/docs/reference/sinks.md | 2 +- plugins/sinks/gcs/README.md | 4 ++-- plugins/sinks/gcs/gcs.go | 34 ++++++++++++++++++---------------- plugins/sinks/gcs/gcs_test.go | 10 +++++----- 5 files changed, 27 insertions(+), 25 deletions(-) diff --git a/docs/docs/concepts/sink.md b/docs/docs/concepts/sink.md index 772acf719..cfd85ae48 100644 --- a/docs/docs/concepts/sink.md +++ b/docs/docs/concepts/sink.md @@ -53,7 +53,7 @@ sinks: - name: gcs config: project_id: google-project-id - path: bucket_name/target_folder + url: gcs://bucket_name/target_folder object_prefix : github-users service_account_base64: service_account_json: diff --git a/docs/docs/reference/sinks.md b/docs/docs/reference/sinks.md index 992f549ae..9159b1ed6 100644 --- a/docs/docs/reference/sinks.md +++ b/docs/docs/reference/sinks.md @@ -57,7 +57,7 @@ sinks: - name: gcs config: project_id: google-project-id - path: bucket_name/target_folder + url: gcs://bucket_name/target_folder object_prefix : github-users service_account_base64: service_account_json: diff --git a/plugins/sinks/gcs/README.md b/plugins/sinks/gcs/README.md index 192d663dc..15ec5da7f 100644 --- a/plugins/sinks/gcs/README.md +++ b/plugins/sinks/gcs/README.md @@ -8,7 +8,7 @@ sinks: - name: gcs config: project_id: google-project-id - path: bucket_name/target_folder + url: gcs://bucket_name/target_folder object_prefix : github-users service_account_base64: service_account_json: @@ -30,7 +30,7 @@ sinks: | Key | Value | Example | Description | | | :-- | :---- | :------ | :---------- | :-- | |`project_id` | `string` | `google-project-id` | Google Cloud Storage Project ID | *required*| -| `path` | `string` | `bucket_name/target_folder` | the path of the folder where new object is to be put | *required* | +| `url` | `string` | `gcs://bucket_name/target_folder` | the URL with bucket name and path of the folder with format `gcs:///` | *required* | | `object_prefix` | `string` | `github-users` | the .ndjson file name prefix where json data will be inserted with timestamp Note: If prefix is not provided, the output data will be put in a `timestamp.ndjson` file in the provided path. Otherwise in the given example the output file will be `github-users-timestamp.ndjson`| *optional* | | `service_account_base64` | `string` | `ewog....fQo=` | Service Account Key in base64 encoded string. Takes precedence over `service_account_json` value | *optional* | | `service_account_json` | `string` | `{"private_key": .., "private_id": ...}` | Service Account Key in JSON string | *optional* | diff --git a/plugins/sinks/gcs/gcs.go b/plugins/sinks/gcs/gcs.go index 483b356c8..2b6250679 100644 --- a/plugins/sinks/gcs/gcs.go +++ b/plugins/sinks/gcs/gcs.go @@ -4,9 +4,9 @@ import ( "context" _ "embed" "encoding/base64" - "strings" - "time" "fmt" + "net/url" + "time" "github.com/MakeNowJust/heredoc" "github.com/odpf/meteor/models" @@ -22,7 +22,7 @@ var summary string type Config struct { ProjectID string `mapstructure:"project_id" validate:"required"` - Path string `mapstructure:"path" validate:"required"` + URL string `mapstructure:"url" validate:"required"` ObjectPrefix string `mapstructure:"object_prefix"` ServiceAccountJSON string `mapstructure:"service_account_json"` ServiceAccountBase64 string `mapstructure:"service_account_base64"` @@ -33,7 +33,7 @@ var info = plugins.Info{ Summary: summary, SampleConfig: heredoc.Doc(` project_id: google-project-id - path: bucket_name/target_folder + url: gcs://bucket_name/target_folder object_prefix : github-users service_account_base64: ____base64_encoded_service_account____ service_account_json: |- @@ -77,7 +77,7 @@ func (s *Sink) Init(ctx context.Context, config plugins.Config) (err error) { return err } - bucketname, objectname := s.resolveBucketandObjectNames() + bucketname, objectname := s.resolveBucketPath() if s.writer, err = newWriter(ctx, []byte(s.config.ServiceAccountJSON), bucketname, objectname); err != nil { return err @@ -101,21 +101,23 @@ func (s *Sink) validateServiceAccountKey() error { return nil } -func (s *Sink) resolveBucketandObjectNames() (string, string) { - dirs := strings.Split(s.config.Path, "/") - bucketname := dirs[0] +func (s *Sink) resolveBucketPath() (string, string) { + result, _ := url.Parse(s.config.URL) + + bucketname := result.Host + path := result.Path[1:] timestamp := time.Now().Format(time.RFC3339) - objectprefix := s.config.ObjectPrefix - - if objectprefix != "" && objectprefix[len(objectprefix)-1] !='-'{ + objectprefix := s.config.ObjectPrefix + + if objectprefix != "" && objectprefix[len(objectprefix)-1] != '-' { objectprefix = fmt.Sprintf("%s-", objectprefix) } - + objectname := fmt.Sprintf("%s%s.ndjson", objectprefix, timestamp) - - if len(dirs) > 1 { - objectname = fmt.Sprintf("%s/%s%s.ndjson", dirs[len(dirs)-1], objectprefix, timestamp) + + if path != "" { + objectname = fmt.Sprintf("%s/%s%s.ndjson", path, objectprefix, timestamp) } return bucketname, objectname @@ -148,7 +150,7 @@ func (s *Sink) writeData(data []*assetsv1beta2.Asset) (err error) { return nil } -func (s *Sink) Close() (err error) { +func (s *Sink) Close() error { return s.writer.Close() } diff --git a/plugins/sinks/gcs/gcs_test.go b/plugins/sinks/gcs/gcs_test.go index 03638f88b..174b56941 100644 --- a/plugins/sinks/gcs/gcs_test.go +++ b/plugins/sinks/gcs/gcs_test.go @@ -20,7 +20,7 @@ import ( var validConfig = map[string]interface{}{ "project_id": "google-project-id", - "path": "bucket_name/target_folder", + "url": "gcs://bucket_name/target_folder", "object_prefix": "github-users", "service_account_base64": "base 64 encoded key", } @@ -40,7 +40,7 @@ func TestInit(t *testing.T) { gcsSink := g.New(testUtils.Logger) actualError := gcsSink.Init(context.TODO(), plugins.Config{RawConfig: map[string]interface{}{ "project_id": "google-project-id", - "path": "bucket_name/target_folder", + "url": "gcs://bucket_name/target_folder", }}) assert.ErrorContains(t, actualError, "credentials are not specified, failed to create client") }) @@ -50,7 +50,7 @@ func TestInit(t *testing.T) { gcsSink := g.New(testUtils.Logger) actualError := gcsSink.Init(context.TODO(), plugins.Config{RawConfig: map[string]interface{}{ "project_id": "google-project-id", - "path": "bucket_name/target_folder", + "url": "gcs://bucket_name/target_folder", "service_account_base64": "----", // invalid }}) assert.ErrorContains(t, actualError, "failed to decode base64 service account") @@ -58,7 +58,7 @@ func TestInit(t *testing.T) { } func TestSink(t *testing.T) { - + t.Run("should write data in bucket and return nil error on success", func(t *testing.T) { u := &v1beta2.User{ FullName: "John Doe", @@ -83,7 +83,7 @@ func TestSink(t *testing.T) { err := gcsSink.Init(context.TODO(), plugins.Config{RawConfig: map[string]interface{}{ "project_id": "google-project-id", - "path": "bucket_name/target_folder", + "url": "gcs://bucket_name/target_folder", "service_account_json": `{"type": "service_account"}`, }}) if err != nil {