Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add GCS sink #469

Merged
merged 6 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/docs/concepts/sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,32 @@ sinks:

Sinks metadata to a file in `json/yaml` format as per the config defined.

* **Google Cloud Storage**

```yaml
sinks:
- name: gcs
config:
project_id: google-project-id
url: gcs://bucket_name/target_folder
object_prefix : github-users
service_account_base64: <base64 encoded service account key>
service_account_json:
{
"type": "service_account",
"private_key_id": "xxxxxxx",
"private_key": "xxxxxxx",
"client_email": "xxxxxxx",
"client_id": "xxxxxxx",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "xxxxxxx",
"client_x509_cert_url": "xxxxxxx",
}
```

Sinks json data to a file as ndjson format in Google Cloud Storage bucket

* **http**

```yaml
Expand Down
27 changes: 27 additions & 0 deletions docs/docs/reference/sinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,33 @@ sinks:
path: "./dir/sample.yaml"
format: "yaml"
```
## GCS

`Google Cloud Storage`

Sinks json data to a file as ndjson format in Google Cloud Storage bucket

```yaml
sinks:
- name: gcs
config:
project_id: google-project-id
url: gcs://bucket_name/target_folder
object_prefix : github-users
service_account_base64: <base64 encoded service account key>
service_account_json:
{
"type": "service_account",
"private_key_id": "xxxxxxx",
"private_key": "xxxxxxx",
"client_email": "xxxxxxx",
"client_id": "xxxxxxx",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "xxxxxxx",
"client_x509_cert_url": "xxxxxxx",
}
```

## http

Expand Down
41 changes: 41 additions & 0 deletions plugins/sinks/gcs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# GCS

Sinks json data to a file as ndjson format in Google Cloud Storage bucket

## Usage
```yaml
sinks:
- name: gcs
config:
project_id: google-project-id
url: gcs://bucket_name/target_folder
object_prefix : github-users
service_account_base64: <base64 encoded service account key>
service_account_json:
{
"type": "service_account",
"private_key_id": "xxxxxxx",
"private_key": "xxxxxxx",
"client_email": "xxxxxxx",
"client_id": "xxxxxxx",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "xxxxxxx",
"client_x509_cert_url": "xxxxxxx",
}
```

## Config Definition

| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :-- |
|`project_id` | `string` | `google-project-id` | Google Cloud Storage Project ID | *required*|
| `url` | `string` | `gcs://bucket_name/target_folder` | the URL with bucket name and path of the folder with format `gcs://<bucket_name>/<optional_folder_path>` | *required* |
| `object_prefix` | `string` | `github-users` | the .ndjson file name prefix where json data will be inserted with timestamp </b></b> Note: If prefix is not provided, the output data will be put in a `timestamp.ndjson` file in the provided path. Otherwise in the given example the output file will be `github-users-timestamp.ndjson`| *optional* |
| `service_account_base64` | `string` | `ewog....fQo=` | Service Account Key in base64 encoded string. Takes precedence over `service_account_json` value | *optional* |
| `service_account_json` | `string` | `{"private_key": .., "private_id": ...}` | Service Account Key in JSON string | *optional* |


## Contributing

Refer to the [contribution guidelines](../../../docs/docs/contribute/guide.md#adding-a-new-extractor) for information on contributing to this module.
43 changes: 43 additions & 0 deletions plugins/sinks/gcs/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package gcs

import (
"context"

"cloud.google.com/go/storage"
"github.com/pkg/errors"
"google.golang.org/api/option"
)

type Writer interface {
WriteData([]byte) error
Close() error
}

type gcsWriter struct {
writer *storage.Writer
}

func newWriter(ctx context.Context, serviceAccountJSON []byte, bucketname string, filepath string) (*gcsWriter, error) {
client, err := storage.NewClient(ctx, option.WithCredentialsJSON(serviceAccountJSON))
if err != nil {
return nil, errors.Wrap(err, "error in creating client")
}

writer := client.Bucket(bucketname).Object(filepath).NewWriter(ctx)

return &gcsWriter{
writer: writer,
}, nil
}

func (c *gcsWriter) WriteData(data []byte) error {
if _, err := c.writer.Write(data); err != nil {
return errors.Wrap(err, "error in writing data to an object")
}

return nil
}

func (c *gcsWriter) Close() error {
return c.writer.Close()
}
163 changes: 163 additions & 0 deletions plugins/sinks/gcs/gcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package gcs

import (
"context"
_ "embed"
"encoding/base64"
"fmt"
"net/url"
"time"

"github.com/MakeNowJust/heredoc"
"github.com/odpf/meteor/models"
assetsv1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/registry"
"github.com/odpf/salt/log"
"github.com/pkg/errors"
)

//go:embed README.md
var summary string

type Config struct {
ProjectID string `mapstructure:"project_id" validate:"required"`
URL string `mapstructure:"url" validate:"required"`
ObjectPrefix string `mapstructure:"object_prefix"`
ServiceAccountJSON string `mapstructure:"service_account_json"`
ServiceAccountBase64 string `mapstructure:"service_account_base64"`
}

var info = plugins.Info{
Description: "saves data in google cloud storage bucket",
Summary: summary,
SampleConfig: heredoc.Doc(`
project_id: google-project-id
url: gcs://bucket_name/target_folder
object_prefix : github-users
service_account_base64: ____base64_encoded_service_account____
service_account_json: |-
{
"type": "service_account",
"private_key_id": "xxxxxxx",
"private_key": "xxxxxxx",
"client_email": "xxxxxxx",
"client_id": "xxxxxxx",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "xxxxxxx",
"client_x509_cert_url": "xxxxxxx"
}
`),
Tags: []string{"gcs", "sink"},
}

type Sink struct {
plugins.BasePlugin
logger log.Logger
writer Writer
config Config
}

func New(logger log.Logger) plugins.Syncer {
s := &Sink{
logger: logger,
}
s.BasePlugin = plugins.NewBasePlugin(info, &s.config)

return s
}

func (s *Sink) Init(ctx context.Context, config plugins.Config) (err error) {
if err = s.BasePlugin.Init(ctx, config); err != nil {
return err
}

if err := s.validateServiceAccountKey(); err != nil {
return err
}

bucketname, objectname := s.resolveBucketPath()

if s.writer, err = newWriter(ctx, []byte(s.config.ServiceAccountJSON), bucketname, objectname); err != nil {
return err
}

return nil
}

func (s *Sink) validateServiceAccountKey() error {
if s.config.ServiceAccountBase64 == "" && s.config.ServiceAccountJSON == "" {
return errors.New("credentials are not specified, failed to create client")
}

if s.config.ServiceAccountBase64 != "" {
serviceAccountJSON, err := base64.StdEncoding.DecodeString(s.config.ServiceAccountBase64)
if err != nil || len(serviceAccountJSON) == 0 {
return errors.Wrap(err, "failed to decode base64 service account")
}
s.config.ServiceAccountJSON = string(serviceAccountJSON)
}
return nil
}

func (s *Sink) resolveBucketPath() (string, string) {
result, _ := url.Parse(s.config.URL)

bucketname := result.Host
path := result.Path[1:]
timestamp := time.Now().Format(time.RFC3339)

objectprefix := s.config.ObjectPrefix

if objectprefix != "" && objectprefix[len(objectprefix)-1] != '-' {
objectprefix = fmt.Sprintf("%s-", objectprefix)
}

objectname := fmt.Sprintf("%s%s.ndjson", objectprefix, timestamp)

if path != "" {
objectname = fmt.Sprintf("%s/%s%s.ndjson", path, objectprefix, timestamp)
}

return bucketname, objectname
}

func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) {
data := make([]*assetsv1beta2.Asset, 0, len(batch))

for _, record := range batch {
data = append(data, record.Data())
}
if err = s.writeData(data); err != nil {
return errors.Wrap(err, "error in writing data to the object")
}
return nil
}

func (s *Sink) writeData(data []*assetsv1beta2.Asset) (err error) {
for _, asset := range data {
jsonBytes, _ := models.ToJSON(asset)

if err := s.writer.WriteData(jsonBytes); err != nil {
return err
}

if err := s.writer.WriteData([]byte("\n")); err != nil {
return err
}
}
return nil
}

func (s *Sink) Close() error {
return s.writer.Close()
}

func init() {
if err := registry.Sinks.Register("gcs", func() plugins.Syncer {
return New(plugins.GetLog())
}); err != nil {
panic(err)
}
}
Loading