From ee8b34dee9f77a060b36200fc75167d8d591f843 Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Tue, 29 Oct 2024 16:21:55 +0530 Subject: [PATCH 01/13] thanos: add support for named stores --- pkg/loki/validation.go | 7 +- pkg/storage/bucket/client.go | 88 ++++++++++++++--- pkg/storage/bucket/named_stores.go | 149 +++++++++++++++++++++++++++++ 3 files changed, 231 insertions(+), 13 deletions(-) create mode 100644 pkg/storage/bucket/named_stores.go diff --git a/pkg/loki/validation.go b/pkg/loki/validation.go index 6e7e19cc44805..83f1cc6cfa748 100644 --- a/pkg/loki/validation.go +++ b/pkg/loki/validation.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" + "github.com/grafana/loki/v3/pkg/storage/bucket" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/types" @@ -97,7 +98,11 @@ func validateSchemaValues(c *Config) []error { errs = append(errs, fmt.Errorf("unrecognized `store` (index) type `%s`, choose one of: %s", cfg.IndexType, strings.Join(types.SupportedIndexTypes, ", "))) } - if !util.StringsContain(types.TestingStorageTypes, cfg.ObjectType) && + if c.StorageConfig.UseThanosObjstore { + if !util.StringsContain(bucket.SupportedBackends, cfg.ObjectType) && !c.StorageConfig.ObjectStore.NamedStores.Exists(cfg.ObjectType) { + errs = append(errs, fmt.Errorf("unrecognized `object_store` type `%s`, which also does not match any named_stores. Choose one of: %s. Or choose a named_store", cfg.ObjectType, strings.Join(bucket.SupportedBackends, ", "))) + } + } else if !util.StringsContain(types.TestingStorageTypes, cfg.ObjectType) && !util.StringsContain(types.SupportedStorageTypes, cfg.ObjectType) && !util.StringsContain(types.DeprecatedStorageTypes, cfg.ObjectType) { if !c.StorageConfig.NamedStores.Exists(cfg.ObjectType) { diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 0bd8ec4e92e53..f8d27040e4237 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -4,6 +4,7 @@ import ( "context" "errors" "flag" + "fmt" "regexp" "github.com/go-kit/log" @@ -56,6 +57,8 @@ type StorageBackendConfig struct { Swift swift.Config `yaml:"swift"` Filesystem filesystem.Config `yaml:"filesystem"` + NamedStores NamedStores `yaml:"named_stores"` + // Used to inject additional backends into the config. Allows for this config to // be embedded in multiple contexts and support non-object storage based backends. ExtraBackends []string `yaml:"-"` @@ -84,12 +87,11 @@ func (cfg *StorageBackendConfig) RegisterFlagsWithPrefix(prefix string, f *flag. } func (cfg *StorageBackendConfig) Validate() error { - // TODO: enable validation when s3 flags are registered - // if err := cfg.S3.Validate(); err != nil { - // return err - //} + if err := cfg.S3.Validate(); err != nil { + return err + } - return nil + return cfg.NamedStores.Validate() } // Config holds configuration for accessing long-term storage. @@ -124,28 +126,90 @@ func (cfg *Config) Validate() error { } } - return cfg.StorageBackendConfig.Validate() + if err := cfg.StorageBackendConfig.Validate(); err != nil { + return err + } + + return cfg.NamedStores.Validate() } // NewClient creates a new bucket client based on the configured backend func NewClient(ctx context.Context, backend string, cfg Config, name string, logger log.Logger) (objstore.InstrumentedBucket, error) { var ( + storeType = backend + namedStore bool + client objstore.Bucket err error ) + if st, ok := cfg.NamedStores.storeType[backend]; ok { + namedStore = true + storeType = st + } + // TODO: add support for other backends that loki already supports - switch backend { + switch storeType { case S3: - client, err = s3.NewBucketClient(cfg.S3, name, logger) + s3Cfg := cfg.S3 + if namedStore { + nsCfg, ok := cfg.NamedStores.S3[backend] + if !ok { + return nil, fmt.Errorf("Unrecognized named s3 storage config %s", backend) + } + + s3Cfg = (s3.Config)(nsCfg) + } + + client, err = s3.NewBucketClient(s3Cfg, name, logger) case GCS: - client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger) + gcsCfg := cfg.GCS + if namedStore { + nsCfg, ok := cfg.NamedStores.GCS[backend] + if !ok { + return nil, fmt.Errorf("Unrecognized named gcs storage config %s", backend) + } + + gcsCfg = (gcs.Config)(nsCfg) + } + + client, err = gcs.NewBucketClient(ctx, gcsCfg, name, logger) case Azure: - client, err = azure.NewBucketClient(cfg.Azure, name, logger) + azureCfg := cfg.Azure + if namedStore { + nsCfg, ok := cfg.NamedStores.Azure[backend] + if !ok { + return nil, fmt.Errorf("Unrecognized named azure storage config %s", backend) + } + + azureCfg = (azure.Config)(nsCfg) + } + + client, err = azure.NewBucketClient(azureCfg, name, logger) case Swift: - client, err = swift.NewBucketClient(cfg.Swift, name, logger) + swiftCfg := cfg.Swift + if namedStore { + nsCfg, ok := cfg.NamedStores.Swift[backend] + if !ok { + return nil, fmt.Errorf("Unrecognized named swift storage config %s", backend) + } + + swiftCfg = (swift.Config)(nsCfg) + } + + client, err = swift.NewBucketClient(swiftCfg, name, logger) case Filesystem: - client, err = filesystem.NewBucketClient(cfg.Filesystem) + fsCfg := cfg.Filesystem + if namedStore { + nsCfg, ok := cfg.NamedStores.Filesystem[backend] + if !ok { + return nil, fmt.Errorf("Unrecognized named swift storage config %s", backend) + } + + fsCfg = (filesystem.Config)(nsCfg) + } + + client, err = filesystem.NewBucketClient(fsCfg) default: return nil, ErrUnsupportedStorageBackend } diff --git a/pkg/storage/bucket/named_stores.go b/pkg/storage/bucket/named_stores.go new file mode 100644 index 0000000000000..56c322a53e947 --- /dev/null +++ b/pkg/storage/bucket/named_stores.go @@ -0,0 +1,149 @@ +package bucket + +import ( + "fmt" + + "github.com/grafana/loki/v3/pkg/storage/bucket/azure" + "github.com/grafana/loki/v3/pkg/storage/bucket/filesystem" + "github.com/grafana/loki/v3/pkg/storage/bucket/gcs" + "github.com/grafana/loki/v3/pkg/storage/bucket/s3" + "github.com/grafana/loki/v3/pkg/storage/bucket/swift" + + "github.com/grafana/dskit/flagext" +) + +// NamedStores helps configure additional object stores from a given storage provider +type NamedStores struct { + Azure map[string]NamedAzureStorageConfig `yaml:"azure"` + Filesystem map[string]NamedFilesystemStorageConfig `yaml:"filesystem"` + GCS map[string]NamedGCSStorageConfig `yaml:"gcs"` + S3 map[string]NamedS3StorageConfig `yaml:"s3"` + Swift map[string]NamedSwiftStorageConfig `yaml:"swift"` + + // contains mapping from named store reference name to store type + storeType map[string]string `yaml:"-"` +} + +func (ns *NamedStores) Validate() error { + for name, s3Cfg := range ns.S3 { + if err := s3Cfg.Validate(); err != nil { + return fmt.Errorf("invalid S3 Storage config with name %s: %w", name, err) + } + } + + return ns.populateStoreType() +} + +func (ns *NamedStores) populateStoreType() error { + ns.storeType = make(map[string]string) + + checkForDuplicates := func(name string) error { + switch name { + case S3, GCS, Azure, Swift, Filesystem: + return fmt.Errorf("named store %q should not match with the name of a predefined storage type", name) + } + + if st, ok := ns.storeType[name]; ok { + return fmt.Errorf("named store %q is already defined under %s", name, st) + } + + return nil + } + + for name := range ns.S3 { + if err := checkForDuplicates(name); err != nil { + return err + } + ns.storeType[name] = S3 + } + + for name := range ns.Azure { + if err := checkForDuplicates(name); err != nil { + return err + } + ns.storeType[name] = Azure + } + + for name := range ns.Filesystem { + if err := checkForDuplicates(name); err != nil { + return err + } + ns.storeType[name] = Filesystem + } + + for name := range ns.GCS { + if err := checkForDuplicates(name); err != nil { + return err + } + ns.storeType[name] = GCS + } + + for name := range ns.Swift { + if err := checkForDuplicates(name); err != nil { + return err + } + ns.storeType[name] = Swift + } + + return nil +} + +func (ns *NamedStores) Exists(name string) bool { + _, ok := ns.storeType[name] + return ok +} + +// Storage configs defined as Named stores don't get any defaults as they do not +// register flags. To get around this we implement Unmarshaler interface that +// assigns the defaults before calling unmarshal. + +// We cannot implement Unmarshaler directly on s3.Config or other stores +// as it would end up overriding values set as part of ApplyDynamicConfig(). +// Note: we unmarshal a second time after applying dynamic configs +// +// Implementing the Unmarshaler for Named*StorageConfig types is fine as +// we do not apply any dynamic config on them. + +type NamedS3StorageConfig s3.Config + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (cfg *NamedS3StorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + flagext.DefaultValues((*s3.Config)(cfg)) + return unmarshal((*s3.Config)(cfg)) +} + +func (cfg *NamedS3StorageConfig) Validate() error { + return (*s3.Config)(cfg).Validate() +} + +type NamedGCSStorageConfig gcs.Config + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (cfg *NamedGCSStorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + flagext.DefaultValues((*gcs.Config)(cfg)) + return unmarshal((*gcs.Config)(cfg)) +} + +type NamedAzureStorageConfig azure.Config + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (cfg *NamedAzureStorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + flagext.DefaultValues((*azure.Config)(cfg)) + return unmarshal((*azure.Config)(cfg)) +} + +type NamedSwiftStorageConfig swift.Config + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (cfg *NamedSwiftStorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + flagext.DefaultValues((*swift.Config)(cfg)) + return unmarshal((*swift.Config)(cfg)) +} + +type NamedFilesystemStorageConfig filesystem.Config + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (cfg *NamedFilesystemStorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + flagext.DefaultValues((*filesystem.Config)(cfg)) + return unmarshal((*filesystem.Config)(cfg)) +} From cfceecbfe06aa3ace5bca8858259a644486e2ba2 Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Wed, 30 Oct 2024 16:24:09 +0530 Subject: [PATCH 02/13] only include named store in storage_config.object_stores --- pkg/loki/config_wrapper.go | 4 +- pkg/loki/config_wrapper_test.go | 134 +++++++++++++++++ pkg/ruler/base/storage.go | 2 +- pkg/storage/bucket/azure/bucket_client.go | 13 +- pkg/storage/bucket/azure/config.go | 4 - pkg/storage/bucket/client.go | 136 +++++------------- pkg/storage/bucket/client_test.go | 2 +- pkg/storage/bucket/gcs/bucket_client.go | 6 +- pkg/storage/bucket/gcs/config.go | 4 - pkg/storage/bucket/named_stores.go | 54 +++++++ pkg/storage/bucket/object_client_adapter.go | 57 ++++++-- .../bucket/object_client_adapter_test.go | 10 +- pkg/storage/bucket/s3/bucket_client.go | 17 +-- pkg/storage/bucket/s3/config.go | 4 - pkg/storage/bucket/sse_bucket_client_test.go | 2 +- pkg/storage/bucket/swift/bucket_client.go | 6 +- .../client/aws/s3_thanos_object_client.go | 44 ------ .../blob_storage_thanos_object_client.go | 44 ------ .../client/gcp/gcs_thanos_object_client.go | 44 ------ pkg/storage/factory.go | 19 +-- 20 files changed, 295 insertions(+), 311 deletions(-) delete mode 100644 pkg/storage/chunk/client/aws/s3_thanos_object_client.go delete mode 100644 pkg/storage/chunk/client/azure/blob_storage_thanos_object_client.go delete mode 100644 pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index a09792cd403ae..ef27c48d4716a 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -566,9 +566,9 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error { } } - if !reflect.DeepEqual(cfg.Common.Storage.ObjectStore, defaults.StorageConfig.ObjectStore) { + if !reflect.DeepEqual(cfg.Common.Storage.ObjectStore, defaults.StorageConfig.ObjectStore.Config) { applyConfig = func(r *ConfigWrapper) { - r.StorageConfig.ObjectStore = r.Common.Storage.ObjectStore + r.StorageConfig.ObjectStore.Config = r.Common.Storage.ObjectStore } } diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 47d92cd8a92cb..203baecf4aec0 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -16,6 +16,10 @@ import ( "github.com/grafana/loki/v3/pkg/distributor" "github.com/grafana/loki/v3/pkg/loki/common" + azurebucket "github.com/grafana/loki/v3/pkg/storage/bucket/azure" + "github.com/grafana/loki/v3/pkg/storage/bucket/filesystem" + "github.com/grafana/loki/v3/pkg/storage/bucket/gcs" + "github.com/grafana/loki/v3/pkg/storage/bucket/s3" "github.com/grafana/loki/v3/pkg/storage/bucket/swift" "github.com/grafana/loki/v3/pkg/storage/chunk/client/alibaba" "github.com/grafana/loki/v3/pkg/storage/chunk/client/aws" @@ -842,6 +846,48 @@ storage_config: assert.Equal(t, "789abc", config.StorageConfig.NamedStores.AWS["store-2"].S3Config.SecretAccessKey.String()) }) + t.Run("named storage config (thanos) provided via config file is preserved", func(t *testing.T) { + namedStoresConfig := `common: + storage: + object_store: + s3: + endpoint: s3://common-bucket + region: us-east1 + access_key_id: abc123 + secret_access_key: def789 +storage_config: + object_store: + named_stores: + s3: + store-1: + endpoint: s3://foo-bucket + region: us-west1 + access_key_id: 123abc + secret_access_key: 789def + store-2: + endpoint: s3://bar-bucket + region: us-west2 + access_key_id: 456def + secret_access_key: 789abc` + config, _ := testContext(namedStoresConfig, nil) + + // should be set by common config + assert.Equal(t, "s3://common-bucket", config.StorageConfig.ObjectStore.S3.Endpoint) + assert.Equal(t, "us-east1", config.StorageConfig.ObjectStore.S3.Region) + assert.Equal(t, "abc123", config.StorageConfig.ObjectStore.S3.AccessKeyID) + assert.Equal(t, "def789", config.StorageConfig.ObjectStore.S3.SecretAccessKey.String()) + + assert.Equal(t, "s3://foo-bucket", config.StorageConfig.ObjectStore.NamedStores.S3["store-1"].Endpoint) + assert.Equal(t, "us-west1", config.StorageConfig.ObjectStore.NamedStores.S3["store-1"].Region) + assert.Equal(t, "123abc", config.StorageConfig.ObjectStore.NamedStores.S3["store-1"].AccessKeyID) + assert.Equal(t, "789def", config.StorageConfig.ObjectStore.NamedStores.S3["store-1"].SecretAccessKey.String()) + + assert.Equal(t, "s3://bar-bucket", config.StorageConfig.ObjectStore.NamedStores.S3["store-2"].Endpoint) + assert.Equal(t, "us-west2", config.StorageConfig.ObjectStore.NamedStores.S3["store-2"].Region) + assert.Equal(t, "456def", config.StorageConfig.ObjectStore.NamedStores.S3["store-2"].AccessKeyID) + assert.Equal(t, "789abc", config.StorageConfig.ObjectStore.NamedStores.S3["store-2"].SecretAccessKey.String()) + }) + t.Run("partial ruler config from file is honored for overriding things like bucket names", func(t *testing.T) { specificRulerConfig := `common: storage: @@ -2280,3 +2326,91 @@ func TestNamedStores_applyDefaults(t *testing.T) { assert.Equal(t, expected, (alibaba.OssConfig)(nsCfg.AlibabaCloud["store-8"])) }) } + +func TestBucketNamedStores_applyDefaults(t *testing.T) { + namedStoresConfig := `storage_config: + object_store: + named_stores: + s3: + store-1: + endpoint: s3.test + bucket_name: foobar + dualstack_enabled: false + azure: + store-2: + account_name: foo + container_name: bar + max_retries: 3 + gcs: + store-3: + bucket_name: foobar + filesystem: + store-4: + dir: foobar + swift: + store-5: + container_name: foobar + request_timeout: 30s +` + // make goconst happy + bucketName := "foobar" + + config, defaults, err := configWrapperFromYAML(t, namedStoresConfig, nil) + require.NoError(t, err) + + nsCfg := config.StorageConfig.ObjectStore.NamedStores + + t.Run("s3", func(t *testing.T) { + assert.Len(t, nsCfg.S3, 1) + + // expect the defaults to be set on named store config + expected := defaults.StorageConfig.ObjectStore.S3 + expected.BucketName = bucketName + expected.Endpoint = "s3.test" + // override defaults + expected.DualstackEnabled = false + + assert.Equal(t, expected, (s3.Config)(nsCfg.S3["store-1"])) + }) + + t.Run("azure", func(t *testing.T) { + assert.Len(t, nsCfg.Azure, 1) + + expected := defaults.StorageConfig.ObjectStore.Azure + expected.StorageAccountName = "foo" + expected.ContainerName = "bar" + // overrides defaults + expected.MaxRetries = 3 + + assert.Equal(t, expected, (azurebucket.Config)(nsCfg.Azure["store-2"])) + }) + + t.Run("gcs", func(t *testing.T) { + assert.Len(t, nsCfg.GCS, 1) + + expected := defaults.StorageConfig.ObjectStore.GCS + expected.BucketName = bucketName + + assert.Equal(t, expected, (gcs.Config)(nsCfg.GCS["store-3"])) + }) + + t.Run("filesystem", func(t *testing.T) { + assert.Len(t, nsCfg.Filesystem, 1) + + expected := defaults.StorageConfig.ObjectStore.Filesystem + expected.Directory = bucketName + + assert.Equal(t, expected, (filesystem.Config)(nsCfg.Filesystem["store-4"])) + }) + + t.Run("swift", func(t *testing.T) { + assert.Len(t, nsCfg.Swift, 1) + + expected := defaults.StorageConfig.ObjectStore.Swift + expected.ContainerName = bucketName + // override defaults + expected.RequestTimeout = 30 * time.Second + + assert.Equal(t, expected, (swift.Config)(nsCfg.Swift["store-5"])) + }) +} diff --git a/pkg/ruler/base/storage.go b/pkg/ruler/base/storage.go index 068718f5491a6..7a1222a7a9fae 100644 --- a/pkg/ruler/base/storage.go +++ b/pkg/ruler/base/storage.go @@ -127,7 +127,7 @@ func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket. return local.NewLocalRulesClient(cfg.Local, loader) } - bucketClient, err := bucket.NewClient(ctx, cfg.Backend, cfg.Config, "ruler-storage", logger) + bucketClient, err := bucket.NewClient(ctx, cfg.Backend, cfg.Config, "ruler-storage", logger, nil) if err != nil { return nil, err } diff --git a/pkg/storage/bucket/azure/bucket_client.go b/pkg/storage/bucket/azure/bucket_client.go index 0cd5e6b3bacff..e1f796d2c9373 100644 --- a/pkg/storage/bucket/azure/bucket_client.go +++ b/pkg/storage/bucket/azure/bucket_client.go @@ -8,11 +8,7 @@ import ( "github.com/thanos-io/objstore/providers/azure" ) -func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) { - return newBucketClient(cfg, name, logger, azure.NewBucketWithConfig) -} - -func newBucketClient(cfg Config, name string, logger log.Logger, factory func(log.Logger, azure.Config, string, http.RoundTripper) (*azure.Bucket, error)) (objstore.Bucket, error) { +func NewBucketClient(cfg Config, name string, logger log.Logger, rt http.RoundTripper) (objstore.Bucket, error) { // Start with default config to make sure that all parameters are set to sensible values, especially // HTTP Config field. bucketConfig := azure.DefaultConfig @@ -28,10 +24,5 @@ func newBucketClient(cfg Config, name string, logger log.Logger, factory func(lo bucketConfig.Endpoint = cfg.Endpoint } - var rt http.RoundTripper - if cfg.Transport != nil { - rt = cfg.Transport - } - - return factory(logger, bucketConfig, name, rt) + return azure.NewBucketWithConfig(logger, bucketConfig, name, rt) } diff --git a/pkg/storage/bucket/azure/config.go b/pkg/storage/bucket/azure/config.go index ac8037b6b7819..cdbfe52ee9252 100644 --- a/pkg/storage/bucket/azure/config.go +++ b/pkg/storage/bucket/azure/config.go @@ -2,7 +2,6 @@ package azure import ( "flag" - "net/http" "github.com/grafana/dskit/flagext" ) @@ -16,9 +15,6 @@ type Config struct { Endpoint string `yaml:"endpoint_suffix"` MaxRetries int `yaml:"max_retries"` UserAssignedID string `yaml:"user_assigned_id"` - - // Allow upstream callers to inject a round tripper - Transport http.RoundTripper `yaml:"-"` } // RegisterFlags registers the flags for Azure storage diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index f8d27040e4237..2878f885eb837 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -4,7 +4,7 @@ import ( "context" "errors" "flag" - "fmt" + "net/http" "regexp" "github.com/go-kit/log" @@ -48,8 +48,8 @@ var ( metrics = objstore.BucketMetrics(prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer), "") ) -// StorageBackendConfig holds configuration for accessing long-term storage. -type StorageBackendConfig struct { +// Config holds configuration for accessing long-term storage. +type Config struct { // Backends S3 s3.Config `yaml:"s3"` GCS gcs.Config `yaml:"gcs"` @@ -57,60 +57,33 @@ type StorageBackendConfig struct { Swift swift.Config `yaml:"swift"` Filesystem filesystem.Config `yaml:"filesystem"` - NamedStores NamedStores `yaml:"named_stores"` + StoragePrefix string `yaml:"storage_prefix"` // Used to inject additional backends into the config. Allows for this config to // be embedded in multiple contexts and support non-object storage based backends. ExtraBackends []string `yaml:"-"` + + // Not used internally, meant to allow callers to wrap Buckets + // created using this config + Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"` } // Returns the SupportedBackends for the package and any custom backends injected into the config. -func (cfg *StorageBackendConfig) SupportedBackends() []string { +func (cfg *Config) SupportedBackends() []string { return append(SupportedBackends, cfg.ExtraBackends...) } // RegisterFlags registers the backend storage config. -func (cfg *StorageBackendConfig) RegisterFlags(f *flag.FlagSet) { +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", f) } -func (cfg *StorageBackendConfig) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) { +func (cfg *Config) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) { cfg.GCS.RegisterFlagsWithPrefix(prefix, f) cfg.S3.RegisterFlagsWithPrefix(prefix, f) cfg.Azure.RegisterFlagsWithPrefix(prefix, f) cfg.Swift.RegisterFlagsWithPrefix(prefix, f) cfg.Filesystem.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f) -} - -func (cfg *StorageBackendConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, "", f) -} - -func (cfg *StorageBackendConfig) Validate() error { - if err := cfg.S3.Validate(); err != nil { - return err - } - - return cfg.NamedStores.Validate() -} - -// Config holds configuration for accessing long-term storage. -type Config struct { - StorageBackendConfig `yaml:",inline"` - StoragePrefix string `yaml:"storage_prefix"` - - // Not used internally, meant to allow callers to wrap Buckets - // created using this config - Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"` -} - -// RegisterFlags registers the backend storage config. -func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("", f) -} - -func (cfg *Config) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) { - cfg.StorageBackendConfig.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f) f.StringVar(&cfg.StoragePrefix, prefix+"storage-prefix", "", "Prefix for all objects stored in the backend storage. For simplicity, it may only contain digits and English alphabet letters.") } @@ -126,7 +99,20 @@ func (cfg *Config) Validate() error { } } - if err := cfg.StorageBackendConfig.Validate(); err != nil { + if err := cfg.S3.Validate(); err != nil { + return err + } + + return nil +} + +type ConfigWithNamedStores struct { + Config `yaml:",inline"` + NamedStores NamedStores `yaml:"named_stores"` +} + +func (cfg *ConfigWithNamedStores) Validate() error { + if err := cfg.Config.Validate(); err != nil { return err } @@ -134,82 +120,24 @@ func (cfg *Config) Validate() error { } // NewClient creates a new bucket client based on the configured backend -func NewClient(ctx context.Context, backend string, cfg Config, name string, logger log.Logger) (objstore.InstrumentedBucket, error) { +func NewClient(ctx context.Context, backend string, cfg Config, name string, logger log.Logger, rt http.RoundTripper) (objstore.InstrumentedBucket, error) { var ( - storeType = backend - namedStore bool - client objstore.Bucket err error ) - if st, ok := cfg.NamedStores.storeType[backend]; ok { - namedStore = true - storeType = st - } - // TODO: add support for other backends that loki already supports - switch storeType { + switch backend { case S3: - s3Cfg := cfg.S3 - if namedStore { - nsCfg, ok := cfg.NamedStores.S3[backend] - if !ok { - return nil, fmt.Errorf("Unrecognized named s3 storage config %s", backend) - } - - s3Cfg = (s3.Config)(nsCfg) - } - - client, err = s3.NewBucketClient(s3Cfg, name, logger) + client, err = s3.NewBucketClient(cfg.S3, name, logger, rt) case GCS: - gcsCfg := cfg.GCS - if namedStore { - nsCfg, ok := cfg.NamedStores.GCS[backend] - if !ok { - return nil, fmt.Errorf("Unrecognized named gcs storage config %s", backend) - } - - gcsCfg = (gcs.Config)(nsCfg) - } - - client, err = gcs.NewBucketClient(ctx, gcsCfg, name, logger) + client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger, rt) case Azure: - azureCfg := cfg.Azure - if namedStore { - nsCfg, ok := cfg.NamedStores.Azure[backend] - if !ok { - return nil, fmt.Errorf("Unrecognized named azure storage config %s", backend) - } - - azureCfg = (azure.Config)(nsCfg) - } - - client, err = azure.NewBucketClient(azureCfg, name, logger) + client, err = azure.NewBucketClient(cfg.Azure, name, logger, rt) case Swift: - swiftCfg := cfg.Swift - if namedStore { - nsCfg, ok := cfg.NamedStores.Swift[backend] - if !ok { - return nil, fmt.Errorf("Unrecognized named swift storage config %s", backend) - } - - swiftCfg = (swift.Config)(nsCfg) - } - - client, err = swift.NewBucketClient(swiftCfg, name, logger) + client, err = swift.NewBucketClient(cfg.Swift, name, logger, rt) case Filesystem: - fsCfg := cfg.Filesystem - if namedStore { - nsCfg, ok := cfg.NamedStores.Filesystem[backend] - if !ok { - return nil, fmt.Errorf("Unrecognized named swift storage config %s", backend) - } - - fsCfg = (filesystem.Config)(nsCfg) - } - - client, err = filesystem.NewBucketClient(fsCfg) + client, err = filesystem.NewBucketClient(cfg.Filesystem) default: return nil, ErrUnsupportedStorageBackend } diff --git a/pkg/storage/bucket/client_test.go b/pkg/storage/bucket/client_test.go index a4bdb8f6e251c..95a38f5591767 100644 --- a/pkg/storage/bucket/client_test.go +++ b/pkg/storage/bucket/client_test.go @@ -76,7 +76,7 @@ func TestNewClient(t *testing.T) { require.NoError(t, err) // Instance a new bucket client from the config - bucketClient, err := NewClient(context.Background(), testData.backend, cfg, "test", util_log.Logger) + bucketClient, err := NewClient(context.Background(), testData.backend, cfg, "test", util_log.Logger, nil) require.Equal(t, testData.expectedErr, err) if testData.expectedErr == nil { diff --git a/pkg/storage/bucket/gcs/bucket_client.go b/pkg/storage/bucket/gcs/bucket_client.go index b5a8ce541e1d7..0dc17463735b3 100644 --- a/pkg/storage/bucket/gcs/bucket_client.go +++ b/pkg/storage/bucket/gcs/bucket_client.go @@ -2,6 +2,7 @@ package gcs import ( "context" + "net/http" "github.com/go-kit/log" "github.com/thanos-io/objstore" @@ -9,13 +10,12 @@ import ( ) // NewBucketClient creates a new GCS bucket client -func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger) (objstore.Bucket, error) { +func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger, rt http.RoundTripper) (objstore.Bucket, error) { // start with default http configs bucketConfig := gcs.DefaultConfig bucketConfig.Bucket = cfg.BucketName bucketConfig.ServiceAccount = cfg.ServiceAccount.String() bucketConfig.ChunkSizeBytes = cfg.ChunkBufferSize - bucketConfig.HTTPConfig.Transport = cfg.Transport - return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, nil) + return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, rt) } diff --git a/pkg/storage/bucket/gcs/config.go b/pkg/storage/bucket/gcs/config.go index a46c5030e4413..d217698606857 100644 --- a/pkg/storage/bucket/gcs/config.go +++ b/pkg/storage/bucket/gcs/config.go @@ -2,7 +2,6 @@ package gcs import ( "flag" - "net/http" "github.com/grafana/dskit/flagext" ) @@ -12,9 +11,6 @@ type Config struct { BucketName string `yaml:"bucket_name"` ServiceAccount flagext.Secret `yaml:"service_account" doc:"description_method=GCSServiceAccountLongDescription"` ChunkBufferSize int `yaml:"chunk_buffer_size"` - - // Allow upstream callers to inject a round tripper - Transport http.RoundTripper `yaml:"-"` } // RegisterFlags registers the flags for GCS storage diff --git a/pkg/storage/bucket/named_stores.go b/pkg/storage/bucket/named_stores.go index 56c322a53e947..ec9409633682d 100644 --- a/pkg/storage/bucket/named_stores.go +++ b/pkg/storage/bucket/named_stores.go @@ -88,11 +88,65 @@ func (ns *NamedStores) populateStoreType() error { return nil } +func (ns *NamedStores) LookupStoreType(name string) (string, bool) { + if ns == nil { + return "", false + } + + st, ok := ns.storeType[name] + return st, ok +} + func (ns *NamedStores) Exists(name string) bool { _, ok := ns.storeType[name] return ok } +// OverrideConfig overrides the store config with the named store config for the provided storeType and backend +func (ns *NamedStores) OverrideConfig(storeCfg *Config, backend, storeType string) error { + switch storeType { + case GCS: + nsCfg, ok := ns.GCS[backend] + if !ok { + return fmt.Errorf("Unrecognized named s3 storage config %s", backend) + } + + storeCfg.GCS = (gcs.Config)(nsCfg) + case S3: + nsCfg, ok := ns.S3[backend] + if !ok { + return fmt.Errorf("Unrecognized named gcs storage config %s", backend) + } + + storeCfg.S3 = (s3.Config)(nsCfg) + case Filesystem: + nsCfg, ok := ns.Filesystem[backend] + if !ok { + return fmt.Errorf("Unrecognized named filesystem storage config %s", backend) + } + + storeCfg.Filesystem = (filesystem.Config)(nsCfg) + case Azure: + nsCfg, ok := ns.Azure[backend] + if !ok { + return fmt.Errorf("Unrecognized named azure storage config %s", backend) + } + + storeCfg.Azure = (azure.Config)(nsCfg) + case Swift: + nsCfg, ok := ns.Swift[backend] + if !ok { + return fmt.Errorf("Unrecognized named swift storage config %s", backend) + } + + storeCfg.Swift = (swift.Config)(nsCfg) + default: + return fmt.Errorf("Unrecognized named storage type: %s", storeType) + } + + return nil +} + // Storage configs defined as Named stores don't get any defaults as they do not // register flags. To get around this we implement Unmarshaler interface that // assigns the defaults before calling unmarshal. diff --git a/pkg/storage/bucket/object_client_adapter.go b/pkg/storage/bucket/object_client_adapter.go index 094f0ad2ea7ac..7fd03f94ba09a 100644 --- a/pkg/storage/bucket/object_client_adapter.go +++ b/pkg/storage/bucket/object_client_adapter.go @@ -2,15 +2,20 @@ package bucket import ( "context" + "fmt" "io" "strings" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" "github.com/grafana/loki/v3/pkg/storage/chunk/client" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/aws" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/gcp" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" ) type ObjectClientAdapter struct { @@ -19,13 +24,40 @@ type ObjectClientAdapter struct { isRetryableErr func(err error) bool } -func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Logger, opts ...ClientOptions) *ObjectClientAdapter { - if hedgedBucket == nil { - hedgedBucket = bucket +func NewObjectClient(ctx context.Context, backend string, cfg ConfigWithNamedStores, component string, hedgingCfg hedging.Config, logger log.Logger) (*ObjectClientAdapter, error) { + var ( + storeType = backend + storeCfg = cfg.Config + ) + + if st, ok := cfg.NamedStores.LookupStoreType(backend); ok { + storeType = st + // override config with values from named store config + if err := cfg.NamedStores.OverrideConfig(&storeCfg, backend, storeType); err != nil { + return nil, err + } + } + + b, err := NewClient(ctx, storeType, storeCfg, component, logger, nil) + if err != nil { + return nil, fmt.Errorf("create bucket: %w", err) + } + + hedgedBucket := b + if hedgingCfg.At != 0 { + hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)) + if err != nil { + return nil, fmt.Errorf("create hedged transport: %w", err) + } + + b, err = NewClient(ctx, storeType, storeCfg, component, logger, hedgedTrasport) + if err != nil { + return nil, fmt.Errorf("create hedged bucket: %w", err) + } } o := &ObjectClientAdapter{ - bucket: bucket, + bucket: b, hedgedBucket: hedgedBucket, logger: log.With(logger, "component", "bucket_to_object_client_adapter"), // default to no retryable errors. Override with WithRetryableErrFunc @@ -34,19 +66,14 @@ func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Log }, } - for _, opt := range opts { - opt(o) + switch storeType { + case GCS: + o.isRetryableErr = gcp.IsRetryableErr + case S3: + o.isRetryableErr = aws.IsRetryableErr } - return o -} - -type ClientOptions func(*ObjectClientAdapter) - -func WithRetryableErrFunc(f func(err error) bool) ClientOptions { - return func(o *ObjectClientAdapter) { - o.isRetryableErr = f - } + return o, nil } func (o *ObjectClientAdapter) Stop() { diff --git a/pkg/storage/bucket/object_client_adapter_test.go b/pkg/storage/bucket/object_client_adapter_test.go index 1ce6de26856bf..e51b53456337d 100644 --- a/pkg/storage/bucket/object_client_adapter_test.go +++ b/pkg/storage/bucket/object_client_adapter_test.go @@ -6,10 +6,12 @@ import ( "sort" "testing" + "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/storage/bucket/filesystem" "github.com/grafana/loki/v3/pkg/storage/chunk/client" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" ) func TestObjectClientAdapter_List(t *testing.T) { @@ -95,8 +97,12 @@ func TestObjectClientAdapter_List(t *testing.T) { require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff)) require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff)) - client := NewObjectClientAdapter(newBucket, nil, nil) - client.bucket = newBucket + client, err := NewObjectClient(context.Background(), "filesystem", ConfigWithNamedStores{ + Config: Config{ + Filesystem: config, + }, + }, "test", hedging.Config{}, log.NewNopLogger()) + require.NoError(t, err) storageObj, storageCommonPref, err := client.List(context.Background(), tt.prefix, tt.delimiter) if tt.wantErr != nil { diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index 5d904d8e5fe9b..8aa7ab7b9de8b 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -1,6 +1,8 @@ package s3 import ( + "net/http" + "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/thanos-io/objstore" @@ -14,23 +16,13 @@ const ( ) // NewBucketClient creates a new S3 bucket client -func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) { - s3Cfg, err := newS3Config(cfg) - if err != nil { - return nil, err - } - - return s3.NewBucketWithConfig(logger, s3Cfg, name, nil) -} - -// NewBucketReaderClient creates a new S3 bucket client -func NewBucketReaderClient(cfg Config, name string, logger log.Logger) (objstore.BucketReader, error) { +func NewBucketClient(cfg Config, name string, logger log.Logger, rt http.RoundTripper) (objstore.Bucket, error) { s3Cfg, err := newS3Config(cfg) if err != nil { return nil, err } - return s3.NewBucketWithConfig(logger, s3Cfg, name, nil) + return s3.NewBucketWithConfig(logger, s3Cfg, name, rt) } func newS3Config(cfg Config) (s3.Config, error) { @@ -70,7 +62,6 @@ func newS3Config(cfg Config) (s3.Config, error) { MaxIdleConns: cfg.HTTP.MaxIdleConns, MaxIdleConnsPerHost: cfg.HTTP.MaxIdleConnsPerHost, MaxConnsPerHost: cfg.HTTP.MaxConnsPerHost, - Transport: cfg.HTTP.Transport, TLSConfig: exthttp.TLSConfig{ CAFile: cfg.HTTP.TLSConfig.CAPath, CertFile: cfg.HTTP.TLSConfig.CertPath, diff --git a/pkg/storage/bucket/s3/config.go b/pkg/storage/bucket/s3/config.go index 792f93f752b32..3882b4e6063c3 100644 --- a/pkg/storage/bucket/s3/config.go +++ b/pkg/storage/bucket/s3/config.go @@ -4,7 +4,6 @@ import ( "encoding/json" "flag" "fmt" - "net/http" "slices" "strings" "time" @@ -66,9 +65,6 @@ type HTTPConfig struct { MaxIdleConnsPerHost int `yaml:"max_idle_connections_per_host" category:"advanced"` MaxConnsPerHost int `yaml:"max_connections_per_host" category:"advanced"` - // Allow upstream callers to inject a round tripper - Transport http.RoundTripper `yaml:"-"` - TLSConfig TLSConfig `yaml:",inline"` } diff --git a/pkg/storage/bucket/sse_bucket_client_test.go b/pkg/storage/bucket/sse_bucket_client_test.go index 697e8837a2f32..5bba133fd4f86 100644 --- a/pkg/storage/bucket/sse_bucket_client_test.go +++ b/pkg/storage/bucket/sse_bucket_client_test.go @@ -56,7 +56,7 @@ func TestSSEBucketClient_Upload_ShouldInjectCustomSSEConfig(t *testing.T) { Insecure: true, } - s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger()) + s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger(), nil) require.NoError(t, err) // Configure the config provider with NO KMS key ID. diff --git a/pkg/storage/bucket/swift/bucket_client.go b/pkg/storage/bucket/swift/bucket_client.go index b36c07e506b87..dbfaace9e789e 100644 --- a/pkg/storage/bucket/swift/bucket_client.go +++ b/pkg/storage/bucket/swift/bucket_client.go @@ -1,6 +1,8 @@ package swift import ( + "net/http" + "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/thanos-io/objstore" @@ -9,7 +11,7 @@ import ( ) // NewBucketClient creates a new Swift bucket client -func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket, error) { +func NewBucketClient(cfg Config, _ string, logger log.Logger, rt http.RoundTripper) (objstore.Bucket, error) { bucketConfig := swift.Config{ AuthVersion: cfg.AuthVersion, AuthUrl: cfg.AuthURL, @@ -42,5 +44,5 @@ func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket, return nil, err } - return swift.NewContainer(logger, serialized, nil) + return swift.NewContainer(logger, serialized, rt) } diff --git a/pkg/storage/chunk/client/aws/s3_thanos_object_client.go b/pkg/storage/chunk/client/aws/s3_thanos_object_client.go deleted file mode 100644 index e00ded920d552..0000000000000 --- a/pkg/storage/chunk/client/aws/s3_thanos_object_client.go +++ /dev/null @@ -1,44 +0,0 @@ -package aws - -import ( - "context" - - "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/thanos-io/objstore" - - "github.com/grafana/loki/v3/pkg/storage/bucket" - "github.com/grafana/loki/v3/pkg/storage/chunk/client" - "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" -) - -func NewS3ThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (client.ObjectClient, error) { - b, err := newS3ThanosObjectClient(ctx, cfg, component, logger, false, hedgingCfg) - if err != nil { - return nil, err - } - - var hedged objstore.Bucket - if hedgingCfg.At != 0 { - hedged, err = newS3ThanosObjectClient(ctx, cfg, component, logger, true, hedgingCfg) - if err != nil { - return nil, err - } - } - - o := bucket.NewObjectClientAdapter(b, hedged, logger, bucket.WithRetryableErrFunc(IsRetryableErr)) - return o, nil -} - -func newS3ThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config) (objstore.Bucket, error) { - if hedging { - hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)) - if err != nil { - return nil, err - } - - cfg.S3.HTTP.Transport = hedgedTrasport - } - - return bucket.NewClient(ctx, bucket.S3, cfg, component, logger) -} diff --git a/pkg/storage/chunk/client/azure/blob_storage_thanos_object_client.go b/pkg/storage/chunk/client/azure/blob_storage_thanos_object_client.go deleted file mode 100644 index 4bf2137433064..0000000000000 --- a/pkg/storage/chunk/client/azure/blob_storage_thanos_object_client.go +++ /dev/null @@ -1,44 +0,0 @@ -package azure - -import ( - "context" - - "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/thanos-io/objstore" - - "github.com/grafana/loki/v3/pkg/storage/bucket" - "github.com/grafana/loki/v3/pkg/storage/chunk/client" - "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" -) - -// NewBlobStorageObjectClient makes a new BlobStorage-backed ObjectClient. -func NewBlobStorageThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (client.ObjectClient, error) { - b, err := newBlobStorageThanosObjClient(ctx, cfg, component, logger, false, hedgingCfg) - if err != nil { - return nil, err - } - - var hedged objstore.Bucket - if hedgingCfg.At != 0 { - hedged, err = newBlobStorageThanosObjClient(ctx, cfg, component, logger, true, hedgingCfg) - if err != nil { - return nil, err - } - } - - return bucket.NewObjectClientAdapter(b, hedged, logger), nil -} - -func newBlobStorageThanosObjClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config) (objstore.Bucket, error) { - if hedging { - hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)) - if err != nil { - return nil, err - } - - cfg.Azure.Transport = hedgedTrasport - } - - return bucket.NewClient(ctx, bucket.Azure, cfg, component, logger) -} diff --git a/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go b/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go deleted file mode 100644 index b4190be2d6943..0000000000000 --- a/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go +++ /dev/null @@ -1,44 +0,0 @@ -package gcp - -import ( - "context" - - "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/thanos-io/objstore" - - "github.com/grafana/loki/v3/pkg/storage/bucket" - "github.com/grafana/loki/v3/pkg/storage/chunk/client" - "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" -) - -func NewGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (client.ObjectClient, error) { - b, err := newGCSThanosObjectClient(ctx, cfg, component, logger, false, hedgingCfg) - if err != nil { - return nil, err - } - - var hedged objstore.Bucket - if hedgingCfg.At != 0 { - hedged, err = newGCSThanosObjectClient(ctx, cfg, component, logger, true, hedgingCfg) - if err != nil { - return nil, err - } - } - - o := bucket.NewObjectClientAdapter(b, hedged, logger, bucket.WithRetryableErrFunc(IsRetryableErr)) - return o, nil -} - -func newGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config) (objstore.Bucket, error) { - if hedging { - hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)) - if err != nil { - return nil, err - } - - cfg.GCS.Transport = hedgedTrasport - } - - return bucket.NewClient(ctx, bucket.GCS, cfg, component, logger) -} diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 7f4046a47d868..a62eba2b23f53 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -295,8 +295,8 @@ type Config struct { DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"` MaxParallelGetChunk int `yaml:"max_parallel_get_chunk"` - UseThanosObjstore bool `yaml:"use_thanos_objstore" doc:"hidden"` - ObjectStore bucket.Config `yaml:"object_store" doc:"hidden"` + UseThanosObjstore bool `yaml:"use_thanos_objstore" doc:"hidden"` + ObjectStore bucket.ConfigWithNamedStores `yaml:"object_store" doc:"hidden"` MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` BoltDBShipperConfig boltdb.IndexCfg `yaml:"boltdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in the form of boltdb files. Required fields only required when boltdb-shipper is defined in config."` @@ -611,12 +611,16 @@ func (c *ClientMetrics) Unregister() { // NewObjectClient makes a new StorageClient with the prefix in the front. func NewObjectClient(name, component string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) { + if cfg.UseThanosObjstore { + return bucket.NewObjectClient(context.Background(), name, cfg.ObjectStore, component, cfg.Hedging, util_log.Logger) + } + actual, err := internalNewObjectClient(name, component, cfg, clientMetrics) if err != nil { return nil, err } - if cfg.UseThanosObjstore || cfg.ObjectPrefix == "" { + if cfg.ObjectPrefix == "" { return actual, nil } else { prefix := strings.Trim(cfg.ObjectPrefix, "/") + "/" @@ -655,9 +659,6 @@ func internalNewObjectClient(storeName, component string, cfg Config, clientMetr s3Cfg.BackoffConfig.MaxRetries = 1 } - if cfg.UseThanosObjstore { - return aws.NewS3ThanosObjectClient(context.Background(), cfg.ObjectStore, component, util_log.Logger, cfg.Hedging) - } return aws.NewS3ObjectClient(s3Cfg, cfg.Hedging) case types.StorageTypeAlibabaCloud: @@ -687,9 +688,6 @@ func internalNewObjectClient(storeName, component string, cfg Config, clientMetr if cfg.CongestionControl.Enabled { gcsCfg.EnableRetries = false } - if cfg.UseThanosObjstore { - return gcp.NewGCSThanosObjectClient(context.Background(), cfg.ObjectStore, component, util_log.Logger, cfg.Hedging) - } return gcp.NewGCSObjectClient(context.Background(), gcsCfg, cfg.Hedging) case types.StorageTypeAzure: @@ -701,9 +699,6 @@ func internalNewObjectClient(storeName, component string, cfg Config, clientMetr } azureCfg = (azure.BlobStorageConfig)(nsCfg) } - if cfg.UseThanosObjstore { - return azure.NewBlobStorageThanosObjectClient(context.Background(), cfg.ObjectStore, component, util_log.Logger, cfg.Hedging) - } return azure.NewBlobStorage(&azureCfg, clientMetrics.AzureMetrics, cfg.Hedging) case types.StorageTypeSwift: From da20d380cbef3f304e9c0a7fa2de6e92eea0eb77 Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Wed, 30 Oct 2024 16:58:37 +0530 Subject: [PATCH 03/13] add tests + nits --- pkg/storage/bucket/named_stores.go | 8 +- pkg/storage/bucket/named_stores_test.go | 92 +++++++++++++++++++++ pkg/storage/bucket/object_client_adapter.go | 8 +- 3 files changed, 98 insertions(+), 10 deletions(-) create mode 100644 pkg/storage/bucket/named_stores_test.go diff --git a/pkg/storage/bucket/named_stores.go b/pkg/storage/bucket/named_stores.go index ec9409633682d..1048191d92eed 100644 --- a/pkg/storage/bucket/named_stores.go +++ b/pkg/storage/bucket/named_stores.go @@ -2,6 +2,7 @@ package bucket import ( "fmt" + "slices" "github.com/grafana/loki/v3/pkg/storage/bucket/azure" "github.com/grafana/loki/v3/pkg/storage/bucket/filesystem" @@ -38,8 +39,7 @@ func (ns *NamedStores) populateStoreType() error { ns.storeType = make(map[string]string) checkForDuplicates := func(name string) error { - switch name { - case S3, GCS, Azure, Swift, Filesystem: + if slices.Contains(SupportedBackends, name) { return fmt.Errorf("named store %q should not match with the name of a predefined storage type", name) } @@ -89,10 +89,6 @@ func (ns *NamedStores) populateStoreType() error { } func (ns *NamedStores) LookupStoreType(name string) (string, bool) { - if ns == nil { - return "", false - } - st, ok := ns.storeType[name] return st, ok } diff --git a/pkg/storage/bucket/named_stores_test.go b/pkg/storage/bucket/named_stores_test.go new file mode 100644 index 0000000000000..224831335ec43 --- /dev/null +++ b/pkg/storage/bucket/named_stores_test.go @@ -0,0 +1,92 @@ +package bucket + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/storage/bucket/gcs" +) + +func TestNamedStores_populateStoreType(t *testing.T) { + t.Run("found duplicates", func(t *testing.T) { + ns := NamedStores{ + S3: map[string]NamedS3StorageConfig{ + "store-1": {}, + "store-2": {}, + }, + GCS: map[string]NamedGCSStorageConfig{ + "store-1": {}, + }, + } + + err := ns.populateStoreType() + require.ErrorContains(t, err, `named store "store-1" is already defined under`) + + }) + + t.Run("illegal store name", func(t *testing.T) { + ns := NamedStores{ + GCS: map[string]NamedGCSStorageConfig{ + "s3": {}, + }, + } + + err := ns.populateStoreType() + require.ErrorContains(t, err, `named store "s3" should not match with the name of a predefined storage type`) + + }) + + t.Run("lookup populated entries", func(t *testing.T) { + ns := NamedStores{ + S3: map[string]NamedS3StorageConfig{ + "store-1": {}, + "store-2": {}, + }, + GCS: map[string]NamedGCSStorageConfig{ + "store-3": {}, + }, + } + + err := ns.populateStoreType() + require.NoError(t, err) + + storeType, ok := ns.LookupStoreType("store-1") + require.True(t, ok) + require.Equal(t, S3, storeType) + + storeType, ok = ns.LookupStoreType("store-2") + require.True(t, ok) + require.Equal(t, S3, storeType) + + storeType, ok = ns.LookupStoreType("store-3") + require.True(t, ok) + require.Equal(t, GCS, storeType) + + _, ok = ns.LookupStoreType("store-4") + require.False(t, ok) + }) +} + +func TestNamedStores_OverrideConfig(t *testing.T) { + namedStoreCfg := NamedStores{ + GCS: map[string]NamedGCSStorageConfig{ + "store-1": { + BucketName: "bar", + ChunkBufferSize: 100, + }, + "store-2": { + BucketName: "baz", + }, + }, + } + + storeCfg := Config{ + GCS: gcs.Config{ + BucketName: "foo", + }, + } + namedStoreCfg.OverrideConfig(&storeCfg, "store-1", GCS) + require.Equal(t, "bar", storeCfg.GCS.BucketName) + require.Equal(t, 100, storeCfg.GCS.ChunkBufferSize) +} diff --git a/pkg/storage/bucket/object_client_adapter.go b/pkg/storage/bucket/object_client_adapter.go index 7fd03f94ba09a..4c01b88dea2ad 100644 --- a/pkg/storage/bucket/object_client_adapter.go +++ b/pkg/storage/bucket/object_client_adapter.go @@ -38,26 +38,26 @@ func NewObjectClient(ctx context.Context, backend string, cfg ConfigWithNamedSto } } - b, err := NewClient(ctx, storeType, storeCfg, component, logger, nil) + bucket, err := NewClient(ctx, storeType, storeCfg, component, logger, nil) if err != nil { return nil, fmt.Errorf("create bucket: %w", err) } - hedgedBucket := b + hedgedBucket := bucket if hedgingCfg.At != 0 { hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)) if err != nil { return nil, fmt.Errorf("create hedged transport: %w", err) } - b, err = NewClient(ctx, storeType, storeCfg, component, logger, hedgedTrasport) + bucket, err = NewClient(ctx, storeType, storeCfg, component, logger, hedgedTrasport) if err != nil { return nil, fmt.Errorf("create hedged bucket: %w", err) } } o := &ObjectClientAdapter{ - bucket: b, + bucket: bucket, hedgedBucket: hedgedBucket, logger: log.With(logger, "component", "bucket_to_object_client_adapter"), // default to no retryable errors. Override with WithRetryableErrFunc From a617782b1ad6fb572e217155e8d641545b6d7ac7 Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Wed, 30 Oct 2024 17:07:46 +0530 Subject: [PATCH 04/13] golangci-lint fixes --- pkg/storage/bucket/named_stores_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/storage/bucket/named_stores_test.go b/pkg/storage/bucket/named_stores_test.go index 224831335ec43..8881a5f813de5 100644 --- a/pkg/storage/bucket/named_stores_test.go +++ b/pkg/storage/bucket/named_stores_test.go @@ -86,7 +86,8 @@ func TestNamedStores_OverrideConfig(t *testing.T) { BucketName: "foo", }, } - namedStoreCfg.OverrideConfig(&storeCfg, "store-1", GCS) + err := namedStoreCfg.OverrideConfig(&storeCfg, "store-1", GCS) + require.NoError(t, err) require.Equal(t, "bar", storeCfg.GCS.BucketName) require.Equal(t, 100, storeCfg.GCS.ChunkBufferSize) } From 0afe96dd729a76ebfbb14573b7a1b366e625d021 Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Tue, 12 Nov 2024 15:58:12 +0530 Subject: [PATCH 05/13] remove rt from bucket client contract --- pkg/storage/bucket/azure/config.go | 4 ++++ pkg/storage/bucket/gcs/bucket_client.go | 5 ++--- pkg/storage/bucket/s3/bucket_client.go | 6 ++---- pkg/storage/bucket/s3/config.go | 4 ++++ pkg/storage/bucket/swift/bucket_client.go | 4 +--- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/storage/bucket/azure/config.go b/pkg/storage/bucket/azure/config.go index cdbfe52ee9252..ac8037b6b7819 100644 --- a/pkg/storage/bucket/azure/config.go +++ b/pkg/storage/bucket/azure/config.go @@ -2,6 +2,7 @@ package azure import ( "flag" + "net/http" "github.com/grafana/dskit/flagext" ) @@ -15,6 +16,9 @@ type Config struct { Endpoint string `yaml:"endpoint_suffix"` MaxRetries int `yaml:"max_retries"` UserAssignedID string `yaml:"user_assigned_id"` + + // Allow upstream callers to inject a round tripper + Transport http.RoundTripper `yaml:"-"` } // RegisterFlags registers the flags for Azure storage diff --git a/pkg/storage/bucket/gcs/bucket_client.go b/pkg/storage/bucket/gcs/bucket_client.go index 092218558048d..950202ea540e9 100644 --- a/pkg/storage/bucket/gcs/bucket_client.go +++ b/pkg/storage/bucket/gcs/bucket_client.go @@ -2,7 +2,6 @@ package gcs import ( "context" - "net/http" "github.com/go-kit/log" "github.com/thanos-io/objstore" @@ -10,7 +9,7 @@ import ( ) // NewBucketClient creates a new GCS bucket client -func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger, rt http.RoundTripper) (objstore.Bucket, error) { +func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger) (objstore.Bucket, error) { // start with default http configs bucketConfig := gcs.DefaultConfig bucketConfig.Bucket = cfg.BucketName @@ -19,5 +18,5 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo bucketConfig.MaxRetries = cfg.MaxRetries bucketConfig.HTTPConfig.Transport = cfg.Transport - return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, rt) + return gcs.NewBucketWithConfig(ctx, logger, bucketConfig, name, nil) } diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index 605e9e2a99d86..b47036a59dae4 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -1,8 +1,6 @@ package s3 import ( - "net/http" - "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/thanos-io/objstore" @@ -16,13 +14,13 @@ const ( ) // NewBucketClient creates a new S3 bucket client -func NewBucketClient(cfg Config, name string, logger log.Logger, rt http.RoundTripper) (objstore.Bucket, error) { +func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) { s3Cfg, err := newS3Config(cfg) if err != nil { return nil, err } - return s3.NewBucketWithConfig(logger, s3Cfg, name, rt) + return s3.NewBucketWithConfig(logger, s3Cfg, name, nil) } func newS3Config(cfg Config) (s3.Config, error) { diff --git a/pkg/storage/bucket/s3/config.go b/pkg/storage/bucket/s3/config.go index 6de8133330ced..67c412de6d606 100644 --- a/pkg/storage/bucket/s3/config.go +++ b/pkg/storage/bucket/s3/config.go @@ -4,6 +4,7 @@ import ( "encoding/json" "flag" "fmt" + "net/http" "slices" "strings" "time" @@ -65,6 +66,9 @@ type HTTPConfig struct { MaxIdleConnsPerHost int `yaml:"max_idle_connections_per_host" category:"advanced"` MaxConnsPerHost int `yaml:"max_connections_per_host" category:"advanced"` + // Allow upstream callers to inject a round tripper + Transport http.RoundTripper `yaml:"-"` + TLSConfig TLSConfig `yaml:",inline"` } diff --git a/pkg/storage/bucket/swift/bucket_client.go b/pkg/storage/bucket/swift/bucket_client.go index d725960f1d774..28f3c922c4254 100644 --- a/pkg/storage/bucket/swift/bucket_client.go +++ b/pkg/storage/bucket/swift/bucket_client.go @@ -1,8 +1,6 @@ package swift import ( - "net/http" - "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/thanos-io/objstore" @@ -11,7 +9,7 @@ import ( ) // NewBucketClient creates a new Swift bucket client -func NewBucketClient(cfg Config, _ string, logger log.Logger, rt http.RoundTripper) (objstore.Bucket, error) { +func NewBucketClient(cfg Config, _ string, logger log.Logger) (objstore.Bucket, error) { bucketConfig := swift.Config{ AuthVersion: cfg.AuthVersion, AuthUrl: cfg.AuthURL, From d4cc37697da028e2ec9f13aa39eca76504d0f4c1 Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Tue, 12 Nov 2024 16:01:01 +0530 Subject: [PATCH 06/13] fixup! remove rt from bucket client contract --- pkg/ruler/base/storage.go | 2 +- pkg/storage/bucket/client.go | 10 +++++----- pkg/storage/bucket/client_test.go | 2 +- pkg/storage/bucket/s3/bucket_client.go | 11 +++++++++++ pkg/storage/bucket/sse_bucket_client_test.go | 2 +- 5 files changed, 19 insertions(+), 8 deletions(-) diff --git a/pkg/ruler/base/storage.go b/pkg/ruler/base/storage.go index 7a1222a7a9fae..068718f5491a6 100644 --- a/pkg/ruler/base/storage.go +++ b/pkg/ruler/base/storage.go @@ -127,7 +127,7 @@ func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket. return local.NewLocalRulesClient(cfg.Local, loader) } - bucketClient, err := bucket.NewClient(ctx, cfg.Backend, cfg.Config, "ruler-storage", logger, nil) + bucketClient, err := bucket.NewClient(ctx, cfg.Backend, cfg.Config, "ruler-storage", logger) if err != nil { return nil, err } diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index ed7f8b5b4cf27..53613f9b06b13 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -159,7 +159,7 @@ func (cfg *Config) configureTransport(backend string, rt http.RoundTripper) erro } // NewClient creates a new bucket client based on the configured backend -func NewClient(ctx context.Context, backend string, cfg Config, name string, logger log.Logger, rt http.RoundTripper) (objstore.InstrumentedBucket, error) { +func NewClient(ctx context.Context, backend string, cfg Config, name string, logger log.Logger) (objstore.InstrumentedBucket, error) { var ( client objstore.Bucket err error @@ -168,13 +168,13 @@ func NewClient(ctx context.Context, backend string, cfg Config, name string, log // TODO: add support for other backends that loki already supports switch backend { case S3: - client, err = s3.NewBucketClient(cfg.S3, name, logger, rt) + client, err = s3.NewBucketClient(cfg.S3, name, logger) case GCS: - client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger, rt) + client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger) case Azure: - client, err = azure.NewBucketClient(cfg.Azure, name, logger, rt) + client, err = azure.NewBucketClient(cfg.Azure, name, logger) case Swift: - client, err = swift.NewBucketClient(cfg.Swift, name, logger, rt) + client, err = swift.NewBucketClient(cfg.Swift, name, logger) case Filesystem: client, err = filesystem.NewBucketClient(cfg.Filesystem) default: diff --git a/pkg/storage/bucket/client_test.go b/pkg/storage/bucket/client_test.go index 95a38f5591767..a4bdb8f6e251c 100644 --- a/pkg/storage/bucket/client_test.go +++ b/pkg/storage/bucket/client_test.go @@ -76,7 +76,7 @@ func TestNewClient(t *testing.T) { require.NoError(t, err) // Instance a new bucket client from the config - bucketClient, err := NewClient(context.Background(), testData.backend, cfg, "test", util_log.Logger, nil) + bucketClient, err := NewClient(context.Background(), testData.backend, cfg, "test", util_log.Logger) require.Equal(t, testData.expectedErr, err) if testData.expectedErr == nil { diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index b47036a59dae4..381f3436f53d4 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -23,6 +23,16 @@ func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucke return s3.NewBucketWithConfig(logger, s3Cfg, name, nil) } +// NewBucketReaderClient creates a new S3 bucket client +func NewBucketReaderClient(cfg Config, name string, logger log.Logger) (objstore.BucketReader, error) { + s3Cfg, err := newS3Config(cfg) + if err != nil { + return nil, err + } + + return s3.NewBucketWithConfig(logger, s3Cfg, name, nil) +} + func newS3Config(cfg Config) (s3.Config, error) { sseCfg, err := cfg.SSE.BuildThanosConfig() if err != nil { @@ -60,6 +70,7 @@ func newS3Config(cfg Config) (s3.Config, error) { MaxIdleConns: cfg.HTTP.MaxIdleConns, MaxIdleConnsPerHost: cfg.HTTP.MaxIdleConnsPerHost, MaxConnsPerHost: cfg.HTTP.MaxConnsPerHost, + Transport: cfg.HTTP.Transport, TLSConfig: exthttp.TLSConfig{ CAFile: cfg.HTTP.TLSConfig.CAPath, CertFile: cfg.HTTP.TLSConfig.CertPath, diff --git a/pkg/storage/bucket/sse_bucket_client_test.go b/pkg/storage/bucket/sse_bucket_client_test.go index 5bba133fd4f86..697e8837a2f32 100644 --- a/pkg/storage/bucket/sse_bucket_client_test.go +++ b/pkg/storage/bucket/sse_bucket_client_test.go @@ -56,7 +56,7 @@ func TestSSEBucketClient_Upload_ShouldInjectCustomSSEConfig(t *testing.T) { Insecure: true, } - s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger(), nil) + s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger()) require.NoError(t, err) // Configure the config provider with NO KMS key ID. From f29a9b3a50a0c85e6e3104e057d4d93fc7363dc4 Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Tue, 12 Nov 2024 17:06:16 +0530 Subject: [PATCH 07/13] minor changes --- pkg/storage/bucket/named_stores.go | 29 +++++++++++-------- pkg/storage/bucket/named_stores_test.go | 3 +- pkg/storage/bucket/object_client_adapter.go | 6 ++-- .../bucket/object_client_adapter_test.go | 2 +- 4 files changed, 23 insertions(+), 17 deletions(-) diff --git a/pkg/storage/bucket/named_stores.go b/pkg/storage/bucket/named_stores.go index 1048191d92eed..afcff982ae5c8 100644 --- a/pkg/storage/bucket/named_stores.go +++ b/pkg/storage/bucket/named_stores.go @@ -98,41 +98,46 @@ func (ns *NamedStores) Exists(name string) bool { return ok } -// OverrideConfig overrides the store config with the named store config for the provided storeType and backend -func (ns *NamedStores) OverrideConfig(storeCfg *Config, backend, storeType string) error { +// OverrideConfig overrides the store config with the named store config +func (ns *NamedStores) OverrideConfig(storeCfg *Config, namedStore string) error { + storeType, ok := ns.LookupStoreType(namedStore) + if !ok { + return fmt.Errorf("Unrecognized named storage config %s", namedStore) + } + switch storeType { case GCS: - nsCfg, ok := ns.GCS[backend] + nsCfg, ok := ns.GCS[namedStore] if !ok { - return fmt.Errorf("Unrecognized named s3 storage config %s", backend) + return fmt.Errorf("Unrecognized named s3 storage config %s", namedStore) } storeCfg.GCS = (gcs.Config)(nsCfg) case S3: - nsCfg, ok := ns.S3[backend] + nsCfg, ok := ns.S3[namedStore] if !ok { - return fmt.Errorf("Unrecognized named gcs storage config %s", backend) + return fmt.Errorf("Unrecognized named gcs storage config %s", namedStore) } storeCfg.S3 = (s3.Config)(nsCfg) case Filesystem: - nsCfg, ok := ns.Filesystem[backend] + nsCfg, ok := ns.Filesystem[namedStore] if !ok { - return fmt.Errorf("Unrecognized named filesystem storage config %s", backend) + return fmt.Errorf("Unrecognized named filesystem storage config %s", namedStore) } storeCfg.Filesystem = (filesystem.Config)(nsCfg) case Azure: - nsCfg, ok := ns.Azure[backend] + nsCfg, ok := ns.Azure[namedStore] if !ok { - return fmt.Errorf("Unrecognized named azure storage config %s", backend) + return fmt.Errorf("Unrecognized named azure storage config %s", namedStore) } storeCfg.Azure = (azure.Config)(nsCfg) case Swift: - nsCfg, ok := ns.Swift[backend] + nsCfg, ok := ns.Swift[namedStore] if !ok { - return fmt.Errorf("Unrecognized named swift storage config %s", backend) + return fmt.Errorf("Unrecognized named swift storage config %s", namedStore) } storeCfg.Swift = (swift.Config)(nsCfg) diff --git a/pkg/storage/bucket/named_stores_test.go b/pkg/storage/bucket/named_stores_test.go index 8881a5f813de5..6af29ef4f825a 100644 --- a/pkg/storage/bucket/named_stores_test.go +++ b/pkg/storage/bucket/named_stores_test.go @@ -80,13 +80,14 @@ func TestNamedStores_OverrideConfig(t *testing.T) { }, }, } + namedStoreCfg.populateStoreType() storeCfg := Config{ GCS: gcs.Config{ BucketName: "foo", }, } - err := namedStoreCfg.OverrideConfig(&storeCfg, "store-1", GCS) + err := namedStoreCfg.OverrideConfig(&storeCfg, "store-1") require.NoError(t, err) require.Equal(t, "bar", storeCfg.GCS.BucketName) require.Equal(t, 100, storeCfg.GCS.ChunkBufferSize) diff --git a/pkg/storage/bucket/object_client_adapter.go b/pkg/storage/bucket/object_client_adapter.go index 1ab8f54229124..9930f176163e3 100644 --- a/pkg/storage/bucket/object_client_adapter.go +++ b/pkg/storage/bucket/object_client_adapter.go @@ -35,13 +35,13 @@ func NewObjectClient(ctx context.Context, backend string, cfg ConfigWithNamedSto if st, ok := cfg.NamedStores.LookupStoreType(backend); ok { storeType = st // override config with values from named store config - if err := cfg.NamedStores.OverrideConfig(&storeCfg, backend, storeType); err != nil { + if err := cfg.NamedStores.OverrideConfig(&storeCfg, backend); err != nil { return nil, err } } if disableRetries { - if err := cfg.disableRetries(storeType); err != nil { + if err := storeCfg.disableRetries(storeType); err != nil { return nil, fmt.Errorf("create bucket: %w", err) } } @@ -58,7 +58,7 @@ func NewObjectClient(ctx context.Context, backend string, cfg ConfigWithNamedSto return nil, fmt.Errorf("create hedged transport: %w", err) } - if err := cfg.configureTransport(storeType, hedgedTrasport); err != nil { + if err := storeCfg.configureTransport(storeType, hedgedTrasport); err != nil { return nil, fmt.Errorf("create hedged bucket: %w", err) } diff --git a/pkg/storage/bucket/object_client_adapter_test.go b/pkg/storage/bucket/object_client_adapter_test.go index e51b53456337d..7b1fe9c275638 100644 --- a/pkg/storage/bucket/object_client_adapter_test.go +++ b/pkg/storage/bucket/object_client_adapter_test.go @@ -101,7 +101,7 @@ func TestObjectClientAdapter_List(t *testing.T) { Config: Config{ Filesystem: config, }, - }, "test", hedging.Config{}, log.NewNopLogger()) + }, "test", hedging.Config{}, false, log.NewNopLogger()) require.NoError(t, err) storageObj, storageCommonPref, err := client.List(context.Background(), tt.prefix, tt.delimiter) From bab5c88834c1af780b986bd9e6c3909eea3e0554 Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Tue, 12 Nov 2024 18:41:08 +0530 Subject: [PATCH 08/13] fix lint --- pkg/storage/bucket/named_stores_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/bucket/named_stores_test.go b/pkg/storage/bucket/named_stores_test.go index 6af29ef4f825a..0fd6d9f701909 100644 --- a/pkg/storage/bucket/named_stores_test.go +++ b/pkg/storage/bucket/named_stores_test.go @@ -80,7 +80,7 @@ func TestNamedStores_OverrideConfig(t *testing.T) { }, }, } - namedStoreCfg.populateStoreType() + require.NoError(t, namedStoreCfg.populateStoreType()) storeCfg := Config{ GCS: gcs.Config{ From 8683b5eb75d83740a60695f54adeba749e3a5dbb Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Fri, 15 Nov 2024 15:07:57 +0530 Subject: [PATCH 09/13] consider named store when creating chunk client --- pkg/storage/factory.go | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 90c6f3165d3f8..801796f2d3454 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -472,6 +472,32 @@ func NewIndexClient(component string, periodCfg config.PeriodConfig, tableRange func NewChunkClient(name, component string, cfg Config, schemaCfg config.SchemaConfig, cc congestion.Controller, registerer prometheus.Registerer, clientMetrics ClientMetrics, logger log.Logger) (client.Client, error) { var storeType = name + if cfg.UseThanosObjstore { + // Check if this is a named store and get its type + if st, ok := cfg.ObjectStore.NamedStores.LookupStoreType(name); ok { + storeType = st + } + + var ( + c client.ObjectClient + err error + ) + c, err = NewObjectClient(name, component, cfg, clientMetrics) + if err != nil { + return nil, err + } + + var encoder client.KeyEncoder + if storeType == bucket.Filesystem { + encoder = client.FSEncoder + } else if cfg.CongestionControl.Enabled { + // Apply congestion control wrapper for non-filesystem storage + c = cc.Wrap(c) + } + + return client.NewClientWithMaxParallel(c, encoder, cfg.MaxParallelGetChunk, schemaCfg), nil + } + // lookup storeType for named stores if nsType, ok := cfg.NamedStores.storeType[name]; ok { storeType = nsType @@ -489,6 +515,7 @@ func NewChunkClient(name, component string, cfg Config, schemaCfg config.SchemaC return client.NewClientWithMaxParallel(c, nil, 1, schemaCfg), nil } + case cfg.UseThanosObjstore: case util.StringsContain(types.SupportedStorageTypes, storeType): switch storeType { case types.StorageTypeFileSystem: @@ -616,10 +643,10 @@ func (c *ClientMetrics) Unregister() { // NewObjectClient makes a new StorageClient with the prefix in the front. func NewObjectClient(name, component string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) { if cfg.UseThanosObjstore { - return bucket.NewObjectClient(context.Background(), name, cfg.ObjectStore, component, cfg.Hedging, cfg.CongestionControl.Enabled, util_log.Logger) + return bucket.NewObjectClient(context.Background(), name, cfg.ObjectStore, component, cfg.Hedging, false, util_log.Logger) } - actual, err := internalNewObjectClient(name, component, cfg, clientMetrics) + actual, err := internalNewObjectClient(name, cfg, clientMetrics) if err != nil { return nil, err } @@ -633,7 +660,7 @@ func NewObjectClient(name, component string, cfg Config, clientMetrics ClientMet } // internalNewObjectClient makes the underlying StorageClient of the desired types. -func internalNewObjectClient(storeName, component string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) { +func internalNewObjectClient(storeName string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) { var ( namedStore string storeType = storeName From d94a0b0c888c895bb65ee59531a22152704c3530 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Fri, 15 Nov 2024 17:44:27 +0530 Subject: [PATCH 10/13] Update pkg/storage/bucket/named_stores.go Co-authored-by: Joao Marcal --- pkg/storage/bucket/named_stores.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/bucket/named_stores.go b/pkg/storage/bucket/named_stores.go index afcff982ae5c8..ce6aa97324a49 100644 --- a/pkg/storage/bucket/named_stores.go +++ b/pkg/storage/bucket/named_stores.go @@ -109,7 +109,7 @@ func (ns *NamedStores) OverrideConfig(storeCfg *Config, namedStore string) error case GCS: nsCfg, ok := ns.GCS[namedStore] if !ok { - return fmt.Errorf("Unrecognized named s3 storage config %s", namedStore) + return fmt.Errorf("Unrecognized named gcs storage config %s", namedStore) } storeCfg.GCS = (gcs.Config)(nsCfg) From 23b1036a9456fe97a0faeac04f8d0467dc847431 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Fri, 15 Nov 2024 17:44:46 +0530 Subject: [PATCH 11/13] Update pkg/storage/bucket/named_stores.go Co-authored-by: Joao Marcal --- pkg/storage/bucket/named_stores.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/bucket/named_stores.go b/pkg/storage/bucket/named_stores.go index ce6aa97324a49..d9869e3bc9af7 100644 --- a/pkg/storage/bucket/named_stores.go +++ b/pkg/storage/bucket/named_stores.go @@ -116,7 +116,7 @@ func (ns *NamedStores) OverrideConfig(storeCfg *Config, namedStore string) error case S3: nsCfg, ok := ns.S3[namedStore] if !ok { - return fmt.Errorf("Unrecognized named gcs storage config %s", namedStore) + return fmt.Errorf("Unrecognized named s3 storage config %s", namedStore) } storeCfg.S3 = (s3.Config)(nsCfg) From 5be0494a1d147ad28b0a704c08e82f6155c8fd1c Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Fri, 15 Nov 2024 17:45:03 +0530 Subject: [PATCH 12/13] Update pkg/storage/factory.go Co-authored-by: Joao Marcal --- pkg/storage/factory.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 801796f2d3454..e0477ab1e7acb 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -515,7 +515,6 @@ func NewChunkClient(name, component string, cfg Config, schemaCfg config.SchemaC return client.NewClientWithMaxParallel(c, nil, 1, schemaCfg), nil } - case cfg.UseThanosObjstore: case util.StringsContain(types.SupportedStorageTypes, storeType): switch storeType { case types.StorageTypeFileSystem: From aa1ffdfe63f31aa31682867f9fc5f779dd831f6b Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Mon, 25 Nov 2024 12:26:05 +0530 Subject: [PATCH 13/13] fixup! Merge branch 'main' into thanos-named-stores --- pkg/storage/bucket/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 561ff6df3817b..327f6f3f9cab1 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -111,6 +111,8 @@ func (cfg *Config) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string cfg.Azure.RegisterFlagsWithPrefix(prefix, f) cfg.Swift.RegisterFlagsWithPrefix(prefix, f) cfg.Filesystem.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f) + cfg.Alibaba.RegisterFlagsWithPrefix(prefix, f) + cfg.BOS.RegisterFlagsWithPrefix(prefix, f) f.StringVar(&cfg.StoragePrefix, prefix+"storage-prefix", "", "Prefix for all objects stored in the backend storage. For simplicity, it may only contain digits and English alphabet letters.") }