Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(thanos): add support for named stores #14638

Merged
merged 18 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,9 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
}
}

if !reflect.DeepEqual(cfg.Common.Storage.ObjectStore, defaults.StorageConfig.ObjectStore) {
if !reflect.DeepEqual(cfg.Common.Storage.ObjectStore, defaults.StorageConfig.ObjectStore.Config) {
applyConfig = func(r *ConfigWrapper) {
r.StorageConfig.ObjectStore = r.Common.Storage.ObjectStore
r.StorageConfig.ObjectStore.Config = r.Common.Storage.ObjectStore
}
}

Expand Down
134 changes: 134 additions & 0 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (

"github.com/grafana/loki/v3/pkg/distributor"
"github.com/grafana/loki/v3/pkg/loki/common"
azurebucket "github.com/grafana/loki/v3/pkg/storage/bucket/azure"
"github.com/grafana/loki/v3/pkg/storage/bucket/filesystem"
"github.com/grafana/loki/v3/pkg/storage/bucket/gcs"
"github.com/grafana/loki/v3/pkg/storage/bucket/s3"
"github.com/grafana/loki/v3/pkg/storage/bucket/swift"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/alibaba"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/aws"
Expand Down Expand Up @@ -842,6 +846,48 @@ storage_config:
assert.Equal(t, "789abc", config.StorageConfig.NamedStores.AWS["store-2"].S3Config.SecretAccessKey.String())
})

t.Run("named storage config (thanos) provided via config file is preserved", func(t *testing.T) {
namedStoresConfig := `common:
storage:
object_store:
s3:
endpoint: s3://common-bucket
region: us-east1
access_key_id: abc123
secret_access_key: def789
storage_config:
object_store:
named_stores:
s3:
store-1:
endpoint: s3://foo-bucket
region: us-west1
access_key_id: 123abc
secret_access_key: 789def
store-2:
endpoint: s3://bar-bucket
region: us-west2
access_key_id: 456def
secret_access_key: 789abc`
config, _ := testContext(namedStoresConfig, nil)

// should be set by common config
assert.Equal(t, "s3://common-bucket", config.StorageConfig.ObjectStore.S3.Endpoint)
assert.Equal(t, "us-east1", config.StorageConfig.ObjectStore.S3.Region)
assert.Equal(t, "abc123", config.StorageConfig.ObjectStore.S3.AccessKeyID)
assert.Equal(t, "def789", config.StorageConfig.ObjectStore.S3.SecretAccessKey.String())

assert.Equal(t, "s3://foo-bucket", config.StorageConfig.ObjectStore.NamedStores.S3["store-1"].Endpoint)
assert.Equal(t, "us-west1", config.StorageConfig.ObjectStore.NamedStores.S3["store-1"].Region)
assert.Equal(t, "123abc", config.StorageConfig.ObjectStore.NamedStores.S3["store-1"].AccessKeyID)
assert.Equal(t, "789def", config.StorageConfig.ObjectStore.NamedStores.S3["store-1"].SecretAccessKey.String())

assert.Equal(t, "s3://bar-bucket", config.StorageConfig.ObjectStore.NamedStores.S3["store-2"].Endpoint)
assert.Equal(t, "us-west2", config.StorageConfig.ObjectStore.NamedStores.S3["store-2"].Region)
assert.Equal(t, "456def", config.StorageConfig.ObjectStore.NamedStores.S3["store-2"].AccessKeyID)
assert.Equal(t, "789abc", config.StorageConfig.ObjectStore.NamedStores.S3["store-2"].SecretAccessKey.String())
})

t.Run("partial ruler config from file is honored for overriding things like bucket names", func(t *testing.T) {
specificRulerConfig := `common:
storage:
Expand Down Expand Up @@ -2280,3 +2326,91 @@ func TestNamedStores_applyDefaults(t *testing.T) {
assert.Equal(t, expected, (alibaba.OssConfig)(nsCfg.AlibabaCloud["store-8"]))
})
}

func TestBucketNamedStores_applyDefaults(t *testing.T) {
namedStoresConfig := `storage_config:
object_store:
named_stores:
s3:
store-1:
endpoint: s3.test
bucket_name: foobar
dualstack_enabled: false
azure:
store-2:
account_name: foo
container_name: bar
max_retries: 3
gcs:
store-3:
bucket_name: foobar
filesystem:
store-4:
dir: foobar
swift:
store-5:
container_name: foobar
request_timeout: 30s
`
// make goconst happy
bucketName := "foobar"

config, defaults, err := configWrapperFromYAML(t, namedStoresConfig, nil)
require.NoError(t, err)

nsCfg := config.StorageConfig.ObjectStore.NamedStores

t.Run("s3", func(t *testing.T) {
assert.Len(t, nsCfg.S3, 1)

// expect the defaults to be set on named store config
expected := defaults.StorageConfig.ObjectStore.S3
expected.BucketName = bucketName
expected.Endpoint = "s3.test"
// override defaults
expected.DualstackEnabled = false

assert.Equal(t, expected, (s3.Config)(nsCfg.S3["store-1"]))
})

t.Run("azure", func(t *testing.T) {
assert.Len(t, nsCfg.Azure, 1)

expected := defaults.StorageConfig.ObjectStore.Azure
expected.StorageAccountName = "foo"
expected.ContainerName = "bar"
// overrides defaults
expected.MaxRetries = 3

assert.Equal(t, expected, (azurebucket.Config)(nsCfg.Azure["store-2"]))
})

t.Run("gcs", func(t *testing.T) {
assert.Len(t, nsCfg.GCS, 1)

expected := defaults.StorageConfig.ObjectStore.GCS
expected.BucketName = bucketName

assert.Equal(t, expected, (gcs.Config)(nsCfg.GCS["store-3"]))
})

t.Run("filesystem", func(t *testing.T) {
assert.Len(t, nsCfg.Filesystem, 1)

expected := defaults.StorageConfig.ObjectStore.Filesystem
expected.Directory = bucketName

assert.Equal(t, expected, (filesystem.Config)(nsCfg.Filesystem["store-4"]))
})

t.Run("swift", func(t *testing.T) {
assert.Len(t, nsCfg.Swift, 1)

expected := defaults.StorageConfig.ObjectStore.Swift
expected.ContainerName = bucketName
// override defaults
expected.RequestTimeout = 30 * time.Second

assert.Equal(t, expected, (swift.Config)(nsCfg.Swift["store-5"]))
})
}
7 changes: 6 additions & 1 deletion pkg/loki/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strings"

"github.com/grafana/loki/v3/pkg/storage/bucket"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/types"
Expand Down Expand Up @@ -97,7 +98,11 @@ func validateSchemaValues(c *Config) []error {
errs = append(errs, fmt.Errorf("unrecognized `store` (index) type `%s`, choose one of: %s", cfg.IndexType, strings.Join(types.SupportedIndexTypes, ", ")))
}

if !util.StringsContain(types.TestingStorageTypes, cfg.ObjectType) &&
if c.StorageConfig.UseThanosObjstore {
if !util.StringsContain(bucket.SupportedBackends, cfg.ObjectType) && !c.StorageConfig.ObjectStore.NamedStores.Exists(cfg.ObjectType) {
errs = append(errs, fmt.Errorf("unrecognized `object_store` type `%s`, which also does not match any named_stores. Choose one of: %s. Or choose a named_store", cfg.ObjectType, strings.Join(bucket.SupportedBackends, ", ")))
}
} else if !util.StringsContain(types.TestingStorageTypes, cfg.ObjectType) &&
!util.StringsContain(types.SupportedStorageTypes, cfg.ObjectType) &&
!util.StringsContain(types.DeprecatedStorageTypes, cfg.ObjectType) {
if !c.StorageConfig.NamedStores.Exists(cfg.ObjectType) {
Expand Down
64 changes: 28 additions & 36 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,83 +49,75 @@ var (
metrics = objstore.BucketMetrics(prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer), "")
)

// StorageBackendConfig holds configuration for accessing long-term storage.
type StorageBackendConfig struct {
// Config holds configuration for accessing long-term storage.
type Config struct {
// Backends
S3 s3.Config `yaml:"s3"`
GCS gcs.Config `yaml:"gcs"`
Azure azure.Config `yaml:"azure"`
Swift swift.Config `yaml:"swift"`
Filesystem filesystem.Config `yaml:"filesystem"`

StoragePrefix string `yaml:"storage_prefix"`

// Used to inject additional backends into the config. Allows for this config to
// be embedded in multiple contexts and support non-object storage based backends.
ExtraBackends []string `yaml:"-"`

// Not used internally, meant to allow callers to wrap Buckets
// created using this config
Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"`
}

// Returns the SupportedBackends for the package and any custom backends injected into the config.
func (cfg *StorageBackendConfig) SupportedBackends() []string {
func (cfg *Config) SupportedBackends() []string {
return append(SupportedBackends, cfg.ExtraBackends...)
}

// RegisterFlags registers the backend storage config.
func (cfg *StorageBackendConfig) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

func (cfg *StorageBackendConfig) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) {
cfg.GCS.RegisterFlagsWithPrefix(prefix, f)
cfg.S3.RegisterFlagsWithPrefix(prefix, f)
cfg.Azure.RegisterFlagsWithPrefix(prefix, f)
cfg.Swift.RegisterFlagsWithPrefix(prefix, f)
cfg.Filesystem.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f)
f.StringVar(&cfg.StoragePrefix, prefix+"storage-prefix", "", "Prefix for all objects stored in the backend storage. For simplicity, it may only contain digits and English alphabet letters.")
}

func (cfg *StorageBackendConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, "", f)
}

func (cfg *StorageBackendConfig) Validate() error {
func (cfg *Config) Validate() error {
if cfg.StoragePrefix != "" {
acceptablePrefixCharacters := regexp.MustCompile(validPrefixCharactersRegex)
if !acceptablePrefixCharacters.MatchString(cfg.StoragePrefix) {
return ErrInvalidCharactersInStoragePrefix
}
}

if err := cfg.S3.Validate(); err != nil {
return err
}

return nil
}

// Config holds configuration for accessing long-term storage.
type Config struct {
StorageBackendConfig `yaml:",inline"`
StoragePrefix string `yaml:"storage_prefix"`

// Not used internally, meant to allow callers to wrap Buckets
// created using this config
Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"`
}

// RegisterFlags registers the backend storage config.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

func (cfg *Config) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) {
cfg.StorageBackendConfig.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f)
f.StringVar(&cfg.StoragePrefix, prefix+"storage-prefix", "", "Prefix for all objects stored in the backend storage. For simplicity, it may only contain digits and English alphabet letters.")
type ConfigWithNamedStores struct {
Config `yaml:",inline"`
NamedStores NamedStores `yaml:"named_stores"`
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, "", f)
}

func (cfg *Config) Validate() error {
if cfg.StoragePrefix != "" {
acceptablePrefixCharacters := regexp.MustCompile(validPrefixCharactersRegex)
if !acceptablePrefixCharacters.MatchString(cfg.StoragePrefix) {
return ErrInvalidCharactersInStoragePrefix
}
func (cfg *ConfigWithNamedStores) Validate() error {
if err := cfg.Config.Validate(); err != nil {
return err
}

return cfg.StorageBackendConfig.Validate()
return cfg.NamedStores.Validate()
}

func (cfg *Config) disableRetries(backend string) error {
Expand Down
Loading
Loading