Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add GCS sink #469

Merged
merged 6 commits into from
Feb 16, 2023
Merged

feat: add GCS sink #469

merged 6 commits into from
Feb 16, 2023

Conversation

Chief-Rishab
Copy link
Member

No description provided.

"google.golang.org/api/option"
)

type GCSClient interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can call this Writer?

@@ -0,0 +1,41 @@
# GCS

Sinks json data to a file in `ndjson` format in a Google Cloud Storage bucket
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can rephrase this as:

Sinks json data to a file as ndjson format in Google Cloud Storage bucket

writer := client.Bucket(bucketname).Object(filepath).NewWriter(ctx)

return &gcsClient{
client: client,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see client getting used anywhere, should we get rid of it?

Copy link
Member Author

@Chief-Rishab Chief-Rishab Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, its's only used to create the writer, will update it

writer *storage.Writer
}

func newGCSClient(ctx context.Context, serviceAccountJSON []byte, bucketname string, filepath string) (GCSClient, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should return *gcsClient instead of interface.

Remember always return struct and accept interfaces in args


func (c *gcsClient) WriteData(jsonBytes []byte) error {
if _, err := c.writer.Write(jsonBytes); err != nil {
return errors.Wrap(err, "error in writing json data to an object")
Copy link
Member

@kushsharma kushsharma Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why client should know if the data is json or something else? It's just raw bytes for it. So this error won't be applicable if you decide to write simple text.

Comment on lines 43 to 49
func (c *gcsClient) Close() error {
if err := c.writer.Close(); err != nil {
return errors.Wrap(err, "error closing the writer")
}

return nil
}
Copy link
Member

@kushsharma kushsharma Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe simplify this as

func (c *gcsClient) Close() error {
	return  c.writer.Close()
}

func (s *Sink) resolveBucketandObjectNames() (string, string) {
dirs := strings.Split(s.config.Path, "/")
bucketname := dirs[0]
timestamp := time.Now().Format("2006.01.02 15:04:05")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest using a pre-defined time RFC format, specially the one without space.

Copy link
Member Author

@Chief-Rishab Chief-Rishab Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will RFC3339 do ? Output format is "2006-01-02T15:04:05Z07:00"

Comment on lines 109 to 111
if s.config.ObjectPrefix != "" {
s.config.ObjectPrefix = s.config.ObjectPrefix + "-"
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if user has provided prefix as hello-, you will add double hyphens?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be a single hyphen only. I will add a check for that

s.config.ObjectPrefix = s.config.ObjectPrefix + "-"
}

objectname := s.config.ObjectPrefix + timestamp + ".ndjson"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use fmt.Sprintf to build strings.


objectname := s.config.ObjectPrefix + timestamp + ".ndjson"
if len(dirs) > 1 {
objectname = dirs[len(dirs)-1] + "/" + s.config.ObjectPrefix + timestamp + ".ndjson"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use fmt.Sprintf to build strings.

}

func (s *Sink) validateServiceAccountKey() error {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary space?

Comment on lines 149 to 153
if err := s.client.Close(); err != nil {
return err
}
return nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return s.client.Close()

@Chief-Rishab Chief-Rishab marked this pull request as ready for review January 31, 2023 11:25
@ravisuhag ravisuhag changed the title Add GCS sink feat: add GCS sink Jan 31, 2023
return nil
}

func (s *Sink) resolveBucketandObjectNames() (string, string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better name could be resolveBucketPath

Comment on lines 105 to 106
dirs := strings.Split(s.config.Path, "/")
bucketname := dirs[0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A notation pretty common in object storage is gcs://bucketname or s3://bucketname I guess this line would fail here. Can we use url.Parse(...)?

objectname := fmt.Sprintf("%s%s.ndjson", objectprefix, timestamp)

if len(dirs) > 1 {
objectname = fmt.Sprintf("%s/%s%s.ndjson", dirs[len(dirs)-1], objectprefix, timestamp)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will dirs[len(dirs)-1] work for a path like gcs://bucketname/path1/path2/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default path format I assumed was the one I got from GCS bucket/folder copy path options. If that's the required case, will handle the gcs prefix

return nil
}

func (s *Sink) Close() (err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(err error) could be just error

@kushsharma kushsharma added enhancement New feature or request sink Add new sink labels Feb 6, 2023
@Chief-Rishab
Copy link
Member Author

Chief-Rishab commented Feb 6, 2023

@kushsharma done with the changes, taking the input as a URL in the sink config, now URL format it accepts in config is gcs://bucketname/optional_folder_paths.

@kushsharma
Copy link
Member

LGTM. Nice work.

@ravisuhag ravisuhag merged commit a8e255c into main Feb 16, 2023
@ravisuhag ravisuhag deleted the gcs-sink branch February 16, 2023 16:51
anjali9791 added a commit to anjali9791/meteor that referenced this pull request Feb 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request sink Add new sink
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants