diff --git a/docs/docs/concepts/sink.md b/docs/docs/concepts/sink.md index 0be8a7d94..cfd85ae48 100644 --- a/docs/docs/concepts/sink.md +++ b/docs/docs/concepts/sink.md @@ -46,6 +46,32 @@ sinks: Sinks metadata to a file in `json/yaml` format as per the config defined. +* **Google Cloud Storage** + +```yaml +sinks: + - name: gcs + config: + project_id: google-project-id + url: gcs://bucket_name/target_folder + object_prefix : github-users + service_account_base64: + service_account_json: + { + "type": "service_account", + "private_key_id": "xxxxxxx", + "private_key": "xxxxxxx", + "client_email": "xxxxxxx", + "client_id": "xxxxxxx", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "xxxxxxx", + "client_x509_cert_url": "xxxxxxx", + } +``` + +Sinks json data to a file as ndjson format in Google Cloud Storage bucket + * **http** ```yaml diff --git a/docs/docs/reference/sinks.md b/docs/docs/reference/sinks.md index 827bd9da9..9159b1ed6 100644 --- a/docs/docs/reference/sinks.md +++ b/docs/docs/reference/sinks.md @@ -46,6 +46,33 @@ sinks: path: "./dir/sample.yaml" format: "yaml" ``` +## GCS + +`Google Cloud Storage` + +Sinks json data to a file as ndjson format in Google Cloud Storage bucket + +```yaml +sinks: + - name: gcs + config: + project_id: google-project-id + url: gcs://bucket_name/target_folder + object_prefix : github-users + service_account_base64: + service_account_json: + { + "type": "service_account", + "private_key_id": "xxxxxxx", + "private_key": "xxxxxxx", + "client_email": "xxxxxxx", + "client_id": "xxxxxxx", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "xxxxxxx", + "client_x509_cert_url": "xxxxxxx", + } +``` ## http diff --git a/plugins/sinks/gcs/README.md b/plugins/sinks/gcs/README.md new file mode 100644 index 000000000..15ec5da7f --- /dev/null +++ b/plugins/sinks/gcs/README.md @@ -0,0 +1,41 @@ +# GCS + +Sinks json data to a file as ndjson format in Google Cloud Storage bucket + +## Usage +```yaml +sinks: + - name: gcs + config: + project_id: google-project-id + url: gcs://bucket_name/target_folder + object_prefix : github-users + service_account_base64: + service_account_json: + { + "type": "service_account", + "private_key_id": "xxxxxxx", + "private_key": "xxxxxxx", + "client_email": "xxxxxxx", + "client_id": "xxxxxxx", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "xxxxxxx", + "client_x509_cert_url": "xxxxxxx", + } +``` + +## Config Definition + +| Key | Value | Example | Description | | +| :-- | :---- | :------ | :---------- | :-- | +|`project_id` | `string` | `google-project-id` | Google Cloud Storage Project ID | *required*| +| `url` | `string` | `gcs://bucket_name/target_folder` | the URL with bucket name and path of the folder with format `gcs:///` | *required* | +| `object_prefix` | `string` | `github-users` | the .ndjson file name prefix where json data will be inserted with timestamp Note: If prefix is not provided, the output data will be put in a `timestamp.ndjson` file in the provided path. Otherwise in the given example the output file will be `github-users-timestamp.ndjson`| *optional* | +| `service_account_base64` | `string` | `ewog....fQo=` | Service Account Key in base64 encoded string. Takes precedence over `service_account_json` value | *optional* | +| `service_account_json` | `string` | `{"private_key": .., "private_id": ...}` | Service Account Key in JSON string | *optional* | + + +## Contributing + +Refer to the [contribution guidelines](../../../docs/docs/contribute/guide.md#adding-a-new-extractor) for information on contributing to this module. diff --git a/plugins/sinks/gcs/client.go b/plugins/sinks/gcs/client.go new file mode 100644 index 000000000..177428aff --- /dev/null +++ b/plugins/sinks/gcs/client.go @@ -0,0 +1,43 @@ +package gcs + +import ( + "context" + + "cloud.google.com/go/storage" + "github.com/pkg/errors" + "google.golang.org/api/option" +) + +type Writer interface { + WriteData([]byte) error + Close() error +} + +type gcsWriter struct { + writer *storage.Writer +} + +func newWriter(ctx context.Context, serviceAccountJSON []byte, bucketname string, filepath string) (*gcsWriter, error) { + client, err := storage.NewClient(ctx, option.WithCredentialsJSON(serviceAccountJSON)) + if err != nil { + return nil, errors.Wrap(err, "error in creating client") + } + + writer := client.Bucket(bucketname).Object(filepath).NewWriter(ctx) + + return &gcsWriter{ + writer: writer, + }, nil +} + +func (c *gcsWriter) WriteData(data []byte) error { + if _, err := c.writer.Write(data); err != nil { + return errors.Wrap(err, "error in writing data to an object") + } + + return nil +} + +func (c *gcsWriter) Close() error { + return c.writer.Close() +} diff --git a/plugins/sinks/gcs/gcs.go b/plugins/sinks/gcs/gcs.go new file mode 100644 index 000000000..2b6250679 --- /dev/null +++ b/plugins/sinks/gcs/gcs.go @@ -0,0 +1,163 @@ +package gcs + +import ( + "context" + _ "embed" + "encoding/base64" + "fmt" + "net/url" + "time" + + "github.com/MakeNowJust/heredoc" + "github.com/odpf/meteor/models" + assetsv1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/registry" + "github.com/odpf/salt/log" + "github.com/pkg/errors" +) + +//go:embed README.md +var summary string + +type Config struct { + ProjectID string `mapstructure:"project_id" validate:"required"` + URL string `mapstructure:"url" validate:"required"` + ObjectPrefix string `mapstructure:"object_prefix"` + ServiceAccountJSON string `mapstructure:"service_account_json"` + ServiceAccountBase64 string `mapstructure:"service_account_base64"` +} + +var info = plugins.Info{ + Description: "saves data in google cloud storage bucket", + Summary: summary, + SampleConfig: heredoc.Doc(` + project_id: google-project-id + url: gcs://bucket_name/target_folder + object_prefix : github-users + service_account_base64: ____base64_encoded_service_account____ + service_account_json: |- + { + "type": "service_account", + "private_key_id": "xxxxxxx", + "private_key": "xxxxxxx", + "client_email": "xxxxxxx", + "client_id": "xxxxxxx", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "xxxxxxx", + "client_x509_cert_url": "xxxxxxx" + } + `), + Tags: []string{"gcs", "sink"}, +} + +type Sink struct { + plugins.BasePlugin + logger log.Logger + writer Writer + config Config +} + +func New(logger log.Logger) plugins.Syncer { + s := &Sink{ + logger: logger, + } + s.BasePlugin = plugins.NewBasePlugin(info, &s.config) + + return s +} + +func (s *Sink) Init(ctx context.Context, config plugins.Config) (err error) { + if err = s.BasePlugin.Init(ctx, config); err != nil { + return err + } + + if err := s.validateServiceAccountKey(); err != nil { + return err + } + + bucketname, objectname := s.resolveBucketPath() + + if s.writer, err = newWriter(ctx, []byte(s.config.ServiceAccountJSON), bucketname, objectname); err != nil { + return err + } + + return nil +} + +func (s *Sink) validateServiceAccountKey() error { + if s.config.ServiceAccountBase64 == "" && s.config.ServiceAccountJSON == "" { + return errors.New("credentials are not specified, failed to create client") + } + + if s.config.ServiceAccountBase64 != "" { + serviceAccountJSON, err := base64.StdEncoding.DecodeString(s.config.ServiceAccountBase64) + if err != nil || len(serviceAccountJSON) == 0 { + return errors.Wrap(err, "failed to decode base64 service account") + } + s.config.ServiceAccountJSON = string(serviceAccountJSON) + } + return nil +} + +func (s *Sink) resolveBucketPath() (string, string) { + result, _ := url.Parse(s.config.URL) + + bucketname := result.Host + path := result.Path[1:] + timestamp := time.Now().Format(time.RFC3339) + + objectprefix := s.config.ObjectPrefix + + if objectprefix != "" && objectprefix[len(objectprefix)-1] != '-' { + objectprefix = fmt.Sprintf("%s-", objectprefix) + } + + objectname := fmt.Sprintf("%s%s.ndjson", objectprefix, timestamp) + + if path != "" { + objectname = fmt.Sprintf("%s/%s%s.ndjson", path, objectprefix, timestamp) + } + + return bucketname, objectname +} + +func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { + data := make([]*assetsv1beta2.Asset, 0, len(batch)) + + for _, record := range batch { + data = append(data, record.Data()) + } + if err = s.writeData(data); err != nil { + return errors.Wrap(err, "error in writing data to the object") + } + return nil +} + +func (s *Sink) writeData(data []*assetsv1beta2.Asset) (err error) { + for _, asset := range data { + jsonBytes, _ := models.ToJSON(asset) + + if err := s.writer.WriteData(jsonBytes); err != nil { + return err + } + + if err := s.writer.WriteData([]byte("\n")); err != nil { + return err + } + } + return nil +} + +func (s *Sink) Close() error { + return s.writer.Close() +} + +func init() { + if err := registry.Sinks.Register("gcs", func() plugins.Syncer { + return New(plugins.GetLog()) + }); err != nil { + panic(err) + } +} diff --git a/plugins/sinks/gcs/gcs_test.go b/plugins/sinks/gcs/gcs_test.go new file mode 100644 index 000000000..174b56941 --- /dev/null +++ b/plugins/sinks/gcs/gcs_test.go @@ -0,0 +1,113 @@ +//go:build plugins +// +build plugins + +package gcs_test + +import ( + "context" + "testing" + + "github.com/odpf/meteor/models" + v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + "github.com/odpf/meteor/plugins" + g "github.com/odpf/meteor/plugins/sinks/gcs" + testUtils "github.com/odpf/meteor/test/utils" + "github.com/odpf/meteor/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "google.golang.org/protobuf/types/known/anypb" +) + +var validConfig = map[string]interface{}{ + "project_id": "google-project-id", + "url": "gcs://bucket_name/target_folder", + "object_prefix": "github-users", + "service_account_base64": "base 64 encoded key", +} + +func TestInit(t *testing.T) { + + t.Run("should return error if config is invalid", func(t *testing.T) { + gcsSink := g.New(testUtils.Logger) + actualError := gcsSink.Init(context.TODO(), plugins.Config{RawConfig: map[string]interface{}{ + "project_id": "", + }}) + assert.ErrorAs(t, actualError, &plugins.InvalidConfigError{}) + }) + + t.Run("should retun error if service account json and service account base64 missing", func(t *testing.T) { + + gcsSink := g.New(testUtils.Logger) + actualError := gcsSink.Init(context.TODO(), plugins.Config{RawConfig: map[string]interface{}{ + "project_id": "google-project-id", + "url": "gcs://bucket_name/target_folder", + }}) + assert.ErrorContains(t, actualError, "credentials are not specified, failed to create client") + }) + + t.Run("should retun error if unable to decode base64 service account key", func(t *testing.T) { + + gcsSink := g.New(testUtils.Logger) + actualError := gcsSink.Init(context.TODO(), plugins.Config{RawConfig: map[string]interface{}{ + "project_id": "google-project-id", + "url": "gcs://bucket_name/target_folder", + "service_account_base64": "----", // invalid + }}) + assert.ErrorContains(t, actualError, "failed to decode base64 service account") + }) +} + +func TestSink(t *testing.T) { + + t.Run("should write data in bucket and return nil error on success", func(t *testing.T) { + u := &v1beta2.User{ + FullName: "John Doe", + Email: "john.doe@odpf.com", + Attributes: utils.TryParseMapToProto(map[string]interface{}{ + "org_unit_path": "/", + "aliases": "doe.john@odpf.com,johndoe@odpf.com", + }), + } + user, _ := anypb.New(u) + data := &v1beta2.Asset{ + Data: user, + } + jsonBytes, _ := models.ToJSON(data) + + ctx := context.TODO() + writer := new(mockWriter) + writer.On("WriteData", jsonBytes).Return(nil) + writer.On("WriteData", []byte("\n")).Return(nil) + + gcsSink := g.New(testUtils.Logger) + + err := gcsSink.Init(context.TODO(), plugins.Config{RawConfig: map[string]interface{}{ + "project_id": "google-project-id", + "url": "gcs://bucket_name/target_folder", + "service_account_json": `{"type": "service_account"}`, + }}) + if err != nil { + t.Fatal(err) + } + + err = gcsSink.Sink(ctx, []models.Record{models.NewRecord(data)}) + + assert.NoError(t, err) + }) +} + +type mockWriter struct { + mock.Mock +} + +func (m *mockWriter) WriteData(jsonBytes []byte) error { + args := m.Called(jsonBytes) + + return args.Error(0) +} + +func (m *mockWriter) Close() error { + args := m.Called() + + return args.Error(0) +} diff --git a/plugins/sinks/populate.go b/plugins/sinks/populate.go index 8906e5796..0f309915c 100644 --- a/plugins/sinks/populate.go +++ b/plugins/sinks/populate.go @@ -4,6 +4,7 @@ import ( _ "github.com/odpf/meteor/plugins/sinks/compass" _ "github.com/odpf/meteor/plugins/sinks/console" _ "github.com/odpf/meteor/plugins/sinks/file" + _ "github.com/odpf/meteor/plugins/sinks/gcs" _ "github.com/odpf/meteor/plugins/sinks/http" _ "github.com/odpf/meteor/plugins/sinks/kafka" _ "github.com/odpf/meteor/plugins/sinks/shield"