diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 4fa7800ef8661..4791bcfd25dbf 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -567,9 +567,9 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error { } } - if !reflect.DeepEqual(cfg.Common.Storage.ObjectStore, defaults.StorageConfig.ObjectStore) { + if !reflect.DeepEqual(cfg.Common.Storage.ObjectStore, defaults.StorageConfig.ObjectStore.Config) { applyConfig = func(r *ConfigWrapper) { - r.StorageConfig.ObjectStore = r.Common.Storage.ObjectStore + r.StorageConfig.ObjectStore.Config = r.Common.Storage.ObjectStore } } diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 47d92cd8a92cb..203baecf4aec0 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -16,6 +16,10 @@ import ( "github.com/grafana/loki/v3/pkg/distributor" "github.com/grafana/loki/v3/pkg/loki/common" + azurebucket "github.com/grafana/loki/v3/pkg/storage/bucket/azure" + "github.com/grafana/loki/v3/pkg/storage/bucket/filesystem" + "github.com/grafana/loki/v3/pkg/storage/bucket/gcs" + "github.com/grafana/loki/v3/pkg/storage/bucket/s3" "github.com/grafana/loki/v3/pkg/storage/bucket/swift" "github.com/grafana/loki/v3/pkg/storage/chunk/client/alibaba" "github.com/grafana/loki/v3/pkg/storage/chunk/client/aws" @@ -842,6 +846,48 @@ storage_config: assert.Equal(t, "789abc", config.StorageConfig.NamedStores.AWS["store-2"].S3Config.SecretAccessKey.String()) }) + t.Run("named storage config (thanos) provided via config file is preserved", func(t *testing.T) { + namedStoresConfig := `common: + storage: + object_store: + s3: + endpoint: s3://common-bucket + region: us-east1 + access_key_id: abc123 + secret_access_key: def789 +storage_config: + object_store: + named_stores: + s3: + store-1: + endpoint: s3://foo-bucket + region: us-west1 + access_key_id: 123abc + secret_access_key: 789def + store-2: + endpoint: s3://bar-bucket + region: us-west2 + access_key_id: 456def + secret_access_key: 789abc` + config, _ := testContext(namedStoresConfig, nil) + + // should be set by common config + assert.Equal(t, "s3://common-bucket", config.StorageConfig.ObjectStore.S3.Endpoint) + assert.Equal(t, "us-east1", config.StorageConfig.ObjectStore.S3.Region) + assert.Equal(t, "abc123", config.StorageConfig.ObjectStore.S3.AccessKeyID) + assert.Equal(t, "def789", config.StorageConfig.ObjectStore.S3.SecretAccessKey.String()) + + assert.Equal(t, "s3://foo-bucket", config.StorageConfig.ObjectStore.NamedStores.S3["store-1"].Endpoint) + assert.Equal(t, "us-west1", config.StorageConfig.ObjectStore.NamedStores.S3["store-1"].Region) + assert.Equal(t, "123abc", config.StorageConfig.ObjectStore.NamedStores.S3["store-1"].AccessKeyID) + assert.Equal(t, "789def", config.StorageConfig.ObjectStore.NamedStores.S3["store-1"].SecretAccessKey.String()) + + assert.Equal(t, "s3://bar-bucket", config.StorageConfig.ObjectStore.NamedStores.S3["store-2"].Endpoint) + assert.Equal(t, "us-west2", config.StorageConfig.ObjectStore.NamedStores.S3["store-2"].Region) + assert.Equal(t, "456def", config.StorageConfig.ObjectStore.NamedStores.S3["store-2"].AccessKeyID) + assert.Equal(t, "789abc", config.StorageConfig.ObjectStore.NamedStores.S3["store-2"].SecretAccessKey.String()) + }) + t.Run("partial ruler config from file is honored for overriding things like bucket names", func(t *testing.T) { specificRulerConfig := `common: storage: @@ -2280,3 +2326,91 @@ func TestNamedStores_applyDefaults(t *testing.T) { assert.Equal(t, expected, (alibaba.OssConfig)(nsCfg.AlibabaCloud["store-8"])) }) } + +func TestBucketNamedStores_applyDefaults(t *testing.T) { + namedStoresConfig := `storage_config: + object_store: + named_stores: + s3: + store-1: + endpoint: s3.test + bucket_name: foobar + dualstack_enabled: false + azure: + store-2: + account_name: foo + container_name: bar + max_retries: 3 + gcs: + store-3: + bucket_name: foobar + filesystem: + store-4: + dir: foobar + swift: + store-5: + container_name: foobar + request_timeout: 30s +` + // make goconst happy + bucketName := "foobar" + + config, defaults, err := configWrapperFromYAML(t, namedStoresConfig, nil) + require.NoError(t, err) + + nsCfg := config.StorageConfig.ObjectStore.NamedStores + + t.Run("s3", func(t *testing.T) { + assert.Len(t, nsCfg.S3, 1) + + // expect the defaults to be set on named store config + expected := defaults.StorageConfig.ObjectStore.S3 + expected.BucketName = bucketName + expected.Endpoint = "s3.test" + // override defaults + expected.DualstackEnabled = false + + assert.Equal(t, expected, (s3.Config)(nsCfg.S3["store-1"])) + }) + + t.Run("azure", func(t *testing.T) { + assert.Len(t, nsCfg.Azure, 1) + + expected := defaults.StorageConfig.ObjectStore.Azure + expected.StorageAccountName = "foo" + expected.ContainerName = "bar" + // overrides defaults + expected.MaxRetries = 3 + + assert.Equal(t, expected, (azurebucket.Config)(nsCfg.Azure["store-2"])) + }) + + t.Run("gcs", func(t *testing.T) { + assert.Len(t, nsCfg.GCS, 1) + + expected := defaults.StorageConfig.ObjectStore.GCS + expected.BucketName = bucketName + + assert.Equal(t, expected, (gcs.Config)(nsCfg.GCS["store-3"])) + }) + + t.Run("filesystem", func(t *testing.T) { + assert.Len(t, nsCfg.Filesystem, 1) + + expected := defaults.StorageConfig.ObjectStore.Filesystem + expected.Directory = bucketName + + assert.Equal(t, expected, (filesystem.Config)(nsCfg.Filesystem["store-4"])) + }) + + t.Run("swift", func(t *testing.T) { + assert.Len(t, nsCfg.Swift, 1) + + expected := defaults.StorageConfig.ObjectStore.Swift + expected.ContainerName = bucketName + // override defaults + expected.RequestTimeout = 30 * time.Second + + assert.Equal(t, expected, (swift.Config)(nsCfg.Swift["store-5"])) + }) +} diff --git a/pkg/loki/validation.go b/pkg/loki/validation.go index 6e7e19cc44805..83f1cc6cfa748 100644 --- a/pkg/loki/validation.go +++ b/pkg/loki/validation.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" + "github.com/grafana/loki/v3/pkg/storage/bucket" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/types" @@ -97,7 +98,11 @@ func validateSchemaValues(c *Config) []error { errs = append(errs, fmt.Errorf("unrecognized `store` (index) type `%s`, choose one of: %s", cfg.IndexType, strings.Join(types.SupportedIndexTypes, ", "))) } - if !util.StringsContain(types.TestingStorageTypes, cfg.ObjectType) && + if c.StorageConfig.UseThanosObjstore { + if !util.StringsContain(bucket.SupportedBackends, cfg.ObjectType) && !c.StorageConfig.ObjectStore.NamedStores.Exists(cfg.ObjectType) { + errs = append(errs, fmt.Errorf("unrecognized `object_store` type `%s`, which also does not match any named_stores. Choose one of: %s. Or choose a named_store", cfg.ObjectType, strings.Join(bucket.SupportedBackends, ", "))) + } + } else if !util.StringsContain(types.TestingStorageTypes, cfg.ObjectType) && !util.StringsContain(types.SupportedStorageTypes, cfg.ObjectType) && !util.StringsContain(types.DeprecatedStorageTypes, cfg.ObjectType) { if !c.StorageConfig.NamedStores.Exists(cfg.ObjectType) { diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 7d6ddb7bf1a28..327f6f3f9cab1 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -73,8 +73,8 @@ func init() { metrics = objstore.BucketMetrics(prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer), "") } -// StorageBackendConfig holds configuration for accessing long-term storage. -type StorageBackendConfig struct { +// Config holds configuration for accessing long-term storage. +type Config struct { // Backends S3 s3.Config `yaml:"s3"` GCS gcs.Config `yaml:"gcs"` @@ -84,22 +84,28 @@ type StorageBackendConfig struct { Alibaba oss.Config `yaml:"alibaba"` BOS bos.Config `yaml:"bos"` + StoragePrefix string `yaml:"storage_prefix"` + // Used to inject additional backends into the config. Allows for this config to // be embedded in multiple contexts and support non-object storage based backends. ExtraBackends []string `yaml:"-"` + + // Not used internally, meant to allow callers to wrap Buckets + // created using this config + Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"` } // Returns the SupportedBackends for the package and any custom backends injected into the config. -func (cfg *StorageBackendConfig) SupportedBackends() []string { +func (cfg *Config) SupportedBackends() []string { return append(SupportedBackends, cfg.ExtraBackends...) } // RegisterFlags registers the backend storage config. -func (cfg *StorageBackendConfig) RegisterFlags(f *flag.FlagSet) { +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", f) } -func (cfg *StorageBackendConfig) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) { +func (cfg *Config) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) { cfg.GCS.RegisterFlagsWithPrefix(prefix, f) cfg.S3.RegisterFlagsWithPrefix(prefix, f) cfg.Azure.RegisterFlagsWithPrefix(prefix, f) @@ -107,13 +113,21 @@ func (cfg *StorageBackendConfig) RegisterFlagsWithPrefixAndDefaultDirectory(pref cfg.Filesystem.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f) cfg.Alibaba.RegisterFlagsWithPrefix(prefix, f) cfg.BOS.RegisterFlagsWithPrefix(prefix, f) + f.StringVar(&cfg.StoragePrefix, prefix+"storage-prefix", "", "Prefix for all objects stored in the backend storage. For simplicity, it may only contain digits and English alphabet letters.") } -func (cfg *StorageBackendConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, "", f) } -func (cfg *StorageBackendConfig) Validate() error { +func (cfg *Config) Validate() error { + if cfg.StoragePrefix != "" { + acceptablePrefixCharacters := regexp.MustCompile(validPrefixCharactersRegex) + if !acceptablePrefixCharacters.MatchString(cfg.StoragePrefix) { + return ErrInvalidCharactersInStoragePrefix + } + } + if err := cfg.S3.Validate(); err != nil { return err } @@ -121,39 +135,17 @@ func (cfg *StorageBackendConfig) Validate() error { return nil } -// Config holds configuration for accessing long-term storage. -type Config struct { - StorageBackendConfig `yaml:",inline"` - StoragePrefix string `yaml:"storage_prefix"` - - // Not used internally, meant to allow callers to wrap Buckets - // created using this config - Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"` -} - -// RegisterFlags registers the backend storage config. -func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("", f) -} - -func (cfg *Config) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) { - cfg.StorageBackendConfig.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f) - f.StringVar(&cfg.StoragePrefix, prefix+"storage-prefix", "", "Prefix for all objects stored in the backend storage. For simplicity, it may only contain digits and English alphabet letters.") +type ConfigWithNamedStores struct { + Config `yaml:",inline"` + NamedStores NamedStores `yaml:"named_stores"` } -func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, "", f) -} - -func (cfg *Config) Validate() error { - if cfg.StoragePrefix != "" { - acceptablePrefixCharacters := regexp.MustCompile(validPrefixCharactersRegex) - if !acceptablePrefixCharacters.MatchString(cfg.StoragePrefix) { - return ErrInvalidCharactersInStoragePrefix - } +func (cfg *ConfigWithNamedStores) Validate() error { + if err := cfg.Config.Validate(); err != nil { + return err } - return cfg.StorageBackendConfig.Validate() + return cfg.NamedStores.Validate() } func (cfg *Config) disableRetries(backend string) error { diff --git a/pkg/storage/bucket/named_stores.go b/pkg/storage/bucket/named_stores.go new file mode 100644 index 0000000000000..d9869e3bc9af7 --- /dev/null +++ b/pkg/storage/bucket/named_stores.go @@ -0,0 +1,204 @@ +package bucket + +import ( + "fmt" + "slices" + + "github.com/grafana/loki/v3/pkg/storage/bucket/azure" + "github.com/grafana/loki/v3/pkg/storage/bucket/filesystem" + "github.com/grafana/loki/v3/pkg/storage/bucket/gcs" + "github.com/grafana/loki/v3/pkg/storage/bucket/s3" + "github.com/grafana/loki/v3/pkg/storage/bucket/swift" + + "github.com/grafana/dskit/flagext" +) + +// NamedStores helps configure additional object stores from a given storage provider +type NamedStores struct { + Azure map[string]NamedAzureStorageConfig `yaml:"azure"` + Filesystem map[string]NamedFilesystemStorageConfig `yaml:"filesystem"` + GCS map[string]NamedGCSStorageConfig `yaml:"gcs"` + S3 map[string]NamedS3StorageConfig `yaml:"s3"` + Swift map[string]NamedSwiftStorageConfig `yaml:"swift"` + + // contains mapping from named store reference name to store type + storeType map[string]string `yaml:"-"` +} + +func (ns *NamedStores) Validate() error { + for name, s3Cfg := range ns.S3 { + if err := s3Cfg.Validate(); err != nil { + return fmt.Errorf("invalid S3 Storage config with name %s: %w", name, err) + } + } + + return ns.populateStoreType() +} + +func (ns *NamedStores) populateStoreType() error { + ns.storeType = make(map[string]string) + + checkForDuplicates := func(name string) error { + if slices.Contains(SupportedBackends, name) { + return fmt.Errorf("named store %q should not match with the name of a predefined storage type", name) + } + + if st, ok := ns.storeType[name]; ok { + return fmt.Errorf("named store %q is already defined under %s", name, st) + } + + return nil + } + + for name := range ns.S3 { + if err := checkForDuplicates(name); err != nil { + return err + } + ns.storeType[name] = S3 + } + + for name := range ns.Azure { + if err := checkForDuplicates(name); err != nil { + return err + } + ns.storeType[name] = Azure + } + + for name := range ns.Filesystem { + if err := checkForDuplicates(name); err != nil { + return err + } + ns.storeType[name] = Filesystem + } + + for name := range ns.GCS { + if err := checkForDuplicates(name); err != nil { + return err + } + ns.storeType[name] = GCS + } + + for name := range ns.Swift { + if err := checkForDuplicates(name); err != nil { + return err + } + ns.storeType[name] = Swift + } + + return nil +} + +func (ns *NamedStores) LookupStoreType(name string) (string, bool) { + st, ok := ns.storeType[name] + return st, ok +} + +func (ns *NamedStores) Exists(name string) bool { + _, ok := ns.storeType[name] + return ok +} + +// OverrideConfig overrides the store config with the named store config +func (ns *NamedStores) OverrideConfig(storeCfg *Config, namedStore string) error { + storeType, ok := ns.LookupStoreType(namedStore) + if !ok { + return fmt.Errorf("Unrecognized named storage config %s", namedStore) + } + + switch storeType { + case GCS: + nsCfg, ok := ns.GCS[namedStore] + if !ok { + return fmt.Errorf("Unrecognized named gcs storage config %s", namedStore) + } + + storeCfg.GCS = (gcs.Config)(nsCfg) + case S3: + nsCfg, ok := ns.S3[namedStore] + if !ok { + return fmt.Errorf("Unrecognized named s3 storage config %s", namedStore) + } + + storeCfg.S3 = (s3.Config)(nsCfg) + case Filesystem: + nsCfg, ok := ns.Filesystem[namedStore] + if !ok { + return fmt.Errorf("Unrecognized named filesystem storage config %s", namedStore) + } + + storeCfg.Filesystem = (filesystem.Config)(nsCfg) + case Azure: + nsCfg, ok := ns.Azure[namedStore] + if !ok { + return fmt.Errorf("Unrecognized named azure storage config %s", namedStore) + } + + storeCfg.Azure = (azure.Config)(nsCfg) + case Swift: + nsCfg, ok := ns.Swift[namedStore] + if !ok { + return fmt.Errorf("Unrecognized named swift storage config %s", namedStore) + } + + storeCfg.Swift = (swift.Config)(nsCfg) + default: + return fmt.Errorf("Unrecognized named storage type: %s", storeType) + } + + return nil +} + +// Storage configs defined as Named stores don't get any defaults as they do not +// register flags. To get around this we implement Unmarshaler interface that +// assigns the defaults before calling unmarshal. + +// We cannot implement Unmarshaler directly on s3.Config or other stores +// as it would end up overriding values set as part of ApplyDynamicConfig(). +// Note: we unmarshal a second time after applying dynamic configs +// +// Implementing the Unmarshaler for Named*StorageConfig types is fine as +// we do not apply any dynamic config on them. + +type NamedS3StorageConfig s3.Config + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (cfg *NamedS3StorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + flagext.DefaultValues((*s3.Config)(cfg)) + return unmarshal((*s3.Config)(cfg)) +} + +func (cfg *NamedS3StorageConfig) Validate() error { + return (*s3.Config)(cfg).Validate() +} + +type NamedGCSStorageConfig gcs.Config + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (cfg *NamedGCSStorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + flagext.DefaultValues((*gcs.Config)(cfg)) + return unmarshal((*gcs.Config)(cfg)) +} + +type NamedAzureStorageConfig azure.Config + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (cfg *NamedAzureStorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + flagext.DefaultValues((*azure.Config)(cfg)) + return unmarshal((*azure.Config)(cfg)) +} + +type NamedSwiftStorageConfig swift.Config + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (cfg *NamedSwiftStorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + flagext.DefaultValues((*swift.Config)(cfg)) + return unmarshal((*swift.Config)(cfg)) +} + +type NamedFilesystemStorageConfig filesystem.Config + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (cfg *NamedFilesystemStorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + flagext.DefaultValues((*filesystem.Config)(cfg)) + return unmarshal((*filesystem.Config)(cfg)) +} diff --git a/pkg/storage/bucket/named_stores_test.go b/pkg/storage/bucket/named_stores_test.go new file mode 100644 index 0000000000000..0fd6d9f701909 --- /dev/null +++ b/pkg/storage/bucket/named_stores_test.go @@ -0,0 +1,94 @@ +package bucket + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/storage/bucket/gcs" +) + +func TestNamedStores_populateStoreType(t *testing.T) { + t.Run("found duplicates", func(t *testing.T) { + ns := NamedStores{ + S3: map[string]NamedS3StorageConfig{ + "store-1": {}, + "store-2": {}, + }, + GCS: map[string]NamedGCSStorageConfig{ + "store-1": {}, + }, + } + + err := ns.populateStoreType() + require.ErrorContains(t, err, `named store "store-1" is already defined under`) + + }) + + t.Run("illegal store name", func(t *testing.T) { + ns := NamedStores{ + GCS: map[string]NamedGCSStorageConfig{ + "s3": {}, + }, + } + + err := ns.populateStoreType() + require.ErrorContains(t, err, `named store "s3" should not match with the name of a predefined storage type`) + + }) + + t.Run("lookup populated entries", func(t *testing.T) { + ns := NamedStores{ + S3: map[string]NamedS3StorageConfig{ + "store-1": {}, + "store-2": {}, + }, + GCS: map[string]NamedGCSStorageConfig{ + "store-3": {}, + }, + } + + err := ns.populateStoreType() + require.NoError(t, err) + + storeType, ok := ns.LookupStoreType("store-1") + require.True(t, ok) + require.Equal(t, S3, storeType) + + storeType, ok = ns.LookupStoreType("store-2") + require.True(t, ok) + require.Equal(t, S3, storeType) + + storeType, ok = ns.LookupStoreType("store-3") + require.True(t, ok) + require.Equal(t, GCS, storeType) + + _, ok = ns.LookupStoreType("store-4") + require.False(t, ok) + }) +} + +func TestNamedStores_OverrideConfig(t *testing.T) { + namedStoreCfg := NamedStores{ + GCS: map[string]NamedGCSStorageConfig{ + "store-1": { + BucketName: "bar", + ChunkBufferSize: 100, + }, + "store-2": { + BucketName: "baz", + }, + }, + } + require.NoError(t, namedStoreCfg.populateStoreType()) + + storeCfg := Config{ + GCS: gcs.Config{ + BucketName: "foo", + }, + } + err := namedStoreCfg.OverrideConfig(&storeCfg, "store-1") + require.NoError(t, err) + require.Equal(t, "bar", storeCfg.GCS.BucketName) + require.Equal(t, 100, storeCfg.GCS.ChunkBufferSize) +} diff --git a/pkg/storage/bucket/object_client_adapter.go b/pkg/storage/bucket/object_client_adapter.go index 011ad0ed624ad..9930f176163e3 100644 --- a/pkg/storage/bucket/object_client_adapter.go +++ b/pkg/storage/bucket/object_client_adapter.go @@ -26,14 +26,27 @@ type ObjectClientAdapter struct { isRetryableErr func(err error) bool } -func NewObjectClient(ctx context.Context, backend string, cfg Config, component string, hedgingCfg hedging.Config, disableRetries bool, logger log.Logger) (*ObjectClientAdapter, error) { +func NewObjectClient(ctx context.Context, backend string, cfg ConfigWithNamedStores, component string, hedgingCfg hedging.Config, disableRetries bool, logger log.Logger) (*ObjectClientAdapter, error) { + var ( + storeType = backend + storeCfg = cfg.Config + ) + + if st, ok := cfg.NamedStores.LookupStoreType(backend); ok { + storeType = st + // override config with values from named store config + if err := cfg.NamedStores.OverrideConfig(&storeCfg, backend); err != nil { + return nil, err + } + } + if disableRetries { - if err := cfg.disableRetries(backend); err != nil { + if err := storeCfg.disableRetries(storeType); err != nil { return nil, fmt.Errorf("create bucket: %w", err) } } - bucket, err := NewClient(ctx, backend, cfg, component, logger) + bucket, err := NewClient(ctx, storeType, storeCfg, component, logger) if err != nil { return nil, fmt.Errorf("create bucket: %w", err) } @@ -45,11 +58,11 @@ func NewObjectClient(ctx context.Context, backend string, cfg Config, component return nil, fmt.Errorf("create hedged transport: %w", err) } - if err := cfg.configureTransport(backend, hedgedTrasport); err != nil { + if err := storeCfg.configureTransport(storeType, hedgedTrasport); err != nil { return nil, fmt.Errorf("create hedged bucket: %w", err) } - hedgedBucket, err = NewClient(ctx, backend, cfg, component, logger) + hedgedBucket, err = NewClient(ctx, storeType, storeCfg, component, logger) if err != nil { return nil, fmt.Errorf("create hedged bucket: %w", err) } @@ -66,7 +79,7 @@ func NewObjectClient(ctx context.Context, backend string, cfg Config, component }, } - switch backend { + switch storeType { case GCS: o.isRetryableErr = gcp.IsRetryableErr case S3: diff --git a/pkg/storage/bucket/object_client_adapter_test.go b/pkg/storage/bucket/object_client_adapter_test.go index 341b59566333a..7b1fe9c275638 100644 --- a/pkg/storage/bucket/object_client_adapter_test.go +++ b/pkg/storage/bucket/object_client_adapter_test.go @@ -97,8 +97,8 @@ func TestObjectClientAdapter_List(t *testing.T) { require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff)) require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff)) - client, err := NewObjectClient(context.Background(), "filesystem", Config{ - StorageBackendConfig: StorageBackendConfig{ + client, err := NewObjectClient(context.Background(), "filesystem", ConfigWithNamedStores{ + Config: Config{ Filesystem: config, }, }, "test", hedging.Config{}, false, log.NewNopLogger()) diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index bc2257a64a876..e0477ab1e7acb 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -295,8 +295,8 @@ type Config struct { DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"` MaxParallelGetChunk int `yaml:"max_parallel_get_chunk"` - UseThanosObjstore bool `yaml:"use_thanos_objstore" doc:"hidden"` - ObjectStore bucket.Config `yaml:"object_store" doc:"hidden"` + UseThanosObjstore bool `yaml:"use_thanos_objstore" doc:"hidden"` + ObjectStore bucket.ConfigWithNamedStores `yaml:"object_store" doc:"hidden"` MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` BoltDBShipperConfig boltdb.IndexCfg `yaml:"boltdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in the form of boltdb files. Required fields only required when boltdb-shipper is defined in config."` @@ -472,6 +472,32 @@ func NewIndexClient(component string, periodCfg config.PeriodConfig, tableRange func NewChunkClient(name, component string, cfg Config, schemaCfg config.SchemaConfig, cc congestion.Controller, registerer prometheus.Registerer, clientMetrics ClientMetrics, logger log.Logger) (client.Client, error) { var storeType = name + if cfg.UseThanosObjstore { + // Check if this is a named store and get its type + if st, ok := cfg.ObjectStore.NamedStores.LookupStoreType(name); ok { + storeType = st + } + + var ( + c client.ObjectClient + err error + ) + c, err = NewObjectClient(name, component, cfg, clientMetrics) + if err != nil { + return nil, err + } + + var encoder client.KeyEncoder + if storeType == bucket.Filesystem { + encoder = client.FSEncoder + } else if cfg.CongestionControl.Enabled { + // Apply congestion control wrapper for non-filesystem storage + c = cc.Wrap(c) + } + + return client.NewClientWithMaxParallel(c, encoder, cfg.MaxParallelGetChunk, schemaCfg), nil + } + // lookup storeType for named stores if nsType, ok := cfg.NamedStores.storeType[name]; ok { storeType = nsType @@ -616,10 +642,10 @@ func (c *ClientMetrics) Unregister() { // NewObjectClient makes a new StorageClient with the prefix in the front. func NewObjectClient(name, component string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) { if cfg.UseThanosObjstore { - return bucket.NewObjectClient(context.Background(), name, cfg.ObjectStore, component, cfg.Hedging, cfg.CongestionControl.Enabled, util_log.Logger) + return bucket.NewObjectClient(context.Background(), name, cfg.ObjectStore, component, cfg.Hedging, false, util_log.Logger) } - actual, err := internalNewObjectClient(name, component, cfg, clientMetrics) + actual, err := internalNewObjectClient(name, cfg, clientMetrics) if err != nil { return nil, err } @@ -633,7 +659,7 @@ func NewObjectClient(name, component string, cfg Config, clientMetrics ClientMet } // internalNewObjectClient makes the underlying StorageClient of the desired types. -func internalNewObjectClient(storeName, component string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) { +func internalNewObjectClient(storeName string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) { var ( namedStore string storeType = storeName