Skip to content

Commit

Permalink
Add support for object prefixes on S3
Browse files Browse the repository at this point in the history
This allows sharing a single bucket between different thanos-store
compoenents, or anything else.
  • Loading branch information
claytono committed Dec 14, 2018
1 parent 964aa13 commit 7598121
Showing 1 changed file with 32 additions and 6 deletions.
38 changes: 32 additions & 6 deletions pkg/objstore/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const DirDelim = "/"
// Config stores the configuration for s3 bucket.
type Config struct {
Bucket string `yaml:"bucket"`
Prefix *string `yaml:"prefix"`
Endpoint string `yaml:"endpoint"`
AccessKey string `yaml:"access_key"`
Insecure bool `yaml:"insecure"`
Expand All @@ -53,6 +54,7 @@ type HTTPConfig struct {
type Bucket struct {
logger log.Logger
name string
prefix *string
client *minio.Client
sse encrypt.ServerSide
}
Expand All @@ -77,7 +79,7 @@ func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error
return NewBucketWithConfig(logger, config, component)
}

// NewBucket returns a new Bucket using the provided s3 config values.
// NewBucketWithConfig returns a new Bucket using the provided s3 config values.
func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) {
var chain []credentials.Provider

Expand Down Expand Up @@ -149,6 +151,10 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
client: client,
sse: sse,
}
if config.Prefix != nil {
prefix := strings.TrimSuffix(*config.Prefix, DirDelim) + DirDelim
bkt.prefix = &prefix
}
return bkt, nil
}

Expand Down Expand Up @@ -182,6 +188,7 @@ func ValidateForTests(conf Config) error {
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error {
// Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
// object itself as one prefix item.
dir = b.withPrefix(dir)
if dir != "" {
dir = strings.TrimSuffix(dir, DirDelim) + DirDelim
}
Expand All @@ -195,7 +202,7 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) err
if object.Key == "" {
continue
}
if err := f(object.Key); err != nil {
if err := f(b.withoutPrefix(object.Key)); err != nil {
return err
}
}
Expand All @@ -210,7 +217,7 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
return nil, err
}
}
r, err := b.client.GetObjectWithContext(ctx, b.name, name, *opts)
r, err := b.client.GetObjectWithContext(ctx, b.name, b.withPrefix(name), *opts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -239,7 +246,7 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (

// Exists checks if the given object exists.
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
_, err := b.client.StatObject(b.name, name, minio.StatObjectOptions{})
_, err := b.client.StatObject(b.name, b.withPrefix(name), minio.StatObjectOptions{})
if err != nil {
if b.IsObjNotFoundErr(err) {
return false, nil
Expand All @@ -261,7 +268,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
fileSize = fileInfo.Size()
}

_, err = b.client.PutObjectWithContext(ctx, b.name, name, r, fileSize,
_, err = b.client.PutObjectWithContext(ctx, b.name, b.withPrefix(name), r, fileSize,
minio.PutObjectOptions{ServerSideEncryption: b.sse, UserMetadata: map[string]string{"X-Amz-Acl": "bucket-owner-full-control"}},
)

Expand All @@ -270,14 +277,28 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {

// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
return b.client.RemoveObject(b.name, name)
return b.client.RemoveObject(b.name, b.withPrefix(name))
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (b *Bucket) IsObjNotFoundErr(err error) bool {
return minio.ToErrorResponse(err).Code == "NoSuchKey"
}

func (b *Bucket) withPrefix(name string) string {
if b.prefix != nil {
return *b.prefix + name
}
return name
}

func (b *Bucket) withoutPrefix(name string) string {
if b.prefix != nil {
return strings.TrimPrefix(name, *b.prefix)
}
return name
}

func (b *Bucket) Close() error { return nil }

func configFromEnv() Config {
Expand All @@ -288,6 +309,11 @@ func configFromEnv() Config {
SecretKey: os.Getenv("S3_SECRET_KEY"),
}

if prefix, ok := os.LookupEnv("S3_PREFIX"); ok {
prefix = strings.TrimSuffix(prefix, DirDelim) + DirDelim
c.Prefix = &prefix
}

c.Insecure, _ = strconv.ParseBool(os.Getenv("S3_INSECURE"))
c.SignatureV2, _ = strconv.ParseBool(os.Getenv("S3_SIGNATURE_VERSION2"))
return c
Expand Down

0 comments on commit 7598121

Please sign in to comment.