Skip to content

Commit b872246

Browse files
feat(storage): AWS backend using thanos.io/objstore (#11221)
Co-authored-by: Ashwanth Goli <[email protected]>
1 parent 51c42e8 commit b872246

File tree

8 files changed

+344
-187
lines changed

8 files changed

+344
-187
lines changed

pkg/storage/bucket/client.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,9 @@ func (cfg *StorageBackendConfig) RegisterFlagsWithPrefix(prefix string, f *flag.
8484
}
8585

8686
func (cfg *StorageBackendConfig) Validate() error {
87-
// TODO: enable validation when s3 flags are registered
88-
// if err := cfg.S3.Validate(); err != nil {
89-
// return err
90-
//}
87+
if err := cfg.S3.Validate(); err != nil {
88+
return err
89+
}
9190

9291
return nil
9392
}

pkg/storage/bucket/s3/bucket_client.go

+32-10
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"github.com/go-kit/log"
55
"github.com/prometheus/common/model"
66
"github.com/thanos-io/objstore"
7+
"github.com/thanos-io/objstore/exthttp"
78
"github.com/thanos-io/objstore/providers/s3"
89
)
910

@@ -38,17 +39,28 @@ func newS3Config(cfg Config) (s3.Config, error) {
3839
return s3.Config{}, err
3940
}
4041

42+
putUserMetadata := map[string]string{}
43+
44+
if cfg.StorageClass != "" {
45+
putUserMetadata[awsStorageClassHeader] = cfg.StorageClass
46+
}
47+
4148
return s3.Config{
42-
Bucket: cfg.BucketName,
43-
Endpoint: cfg.Endpoint,
44-
Region: cfg.Region,
45-
AccessKey: cfg.AccessKeyID,
46-
SecretKey: cfg.SecretAccessKey.String(),
47-
SessionToken: cfg.SessionToken.String(),
48-
Insecure: cfg.Insecure,
49-
DisableDualstack: cfg.DisableDualstack,
50-
SSEConfig: sseCfg,
51-
PutUserMetadata: map[string]string{awsStorageClassHeader: cfg.StorageClass},
49+
Bucket: cfg.BucketName,
50+
Endpoint: cfg.Endpoint,
51+
Region: cfg.Region,
52+
AccessKey: cfg.AccessKeyID,
53+
SecretKey: cfg.SecretAccessKey.String(),
54+
SessionToken: cfg.SessionToken.String(),
55+
Insecure: cfg.Insecure,
56+
PutUserMetadata: putUserMetadata,
57+
SendContentMd5: cfg.SendContentMd5,
58+
SSEConfig: sseCfg,
59+
DisableDualstack: !cfg.DualstackEnabled,
60+
ListObjectsVersion: cfg.ListObjectsVersion,
61+
BucketLookupType: cfg.BucketLookupType,
62+
AWSSDKAuth: cfg.NativeAWSAuthEnabled,
63+
PartSize: cfg.PartSize,
5264
HTTPConfig: s3.HTTPConfig{
5365
IdleConnTimeout: model.Duration(cfg.HTTP.IdleConnTimeout),
5466
ResponseHeaderTimeout: model.Duration(cfg.HTTP.ResponseHeaderTimeout),
@@ -59,6 +71,16 @@ func newS3Config(cfg Config) (s3.Config, error) {
5971
MaxIdleConnsPerHost: cfg.HTTP.MaxIdleConnsPerHost,
6072
MaxConnsPerHost: cfg.HTTP.MaxConnsPerHost,
6173
Transport: cfg.HTTP.Transport,
74+
TLSConfig: exthttp.TLSConfig{
75+
CAFile: cfg.HTTP.TLSConfig.CAPath,
76+
CertFile: cfg.HTTP.TLSConfig.CertPath,
77+
KeyFile: cfg.HTTP.TLSConfig.KeyPath,
78+
ServerName: cfg.HTTP.TLSConfig.ServerName,
79+
},
80+
},
81+
TraceConfig: s3.TraceConfig{
82+
Enable: cfg.TraceConfig.Enabled,
6283
},
84+
STSEndpoint: cfg.STSEndpoint,
6385
}, nil
6486
}

pkg/storage/bucket/s3/config.go

+131-33
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,20 @@ import (
55
"flag"
66
"fmt"
77
"net/http"
8+
"slices"
89
"strings"
10+
"time"
911

12+
s3_service "github.com/aws/aws-sdk-go/service/s3"
1013
"github.com/grafana/dskit/flagext"
1114
"github.com/minio/minio-go/v7/pkg/encrypt"
1215
"github.com/pkg/errors"
1316
"github.com/thanos-io/objstore/providers/s3"
1417

15-
bucket_http "github.com/grafana/loki/v3/pkg/storage/bucket/http"
16-
"github.com/grafana/loki/v3/pkg/storage/common/aws"
1718
"github.com/grafana/loki/v3/pkg/util"
1819
)
1920

2021
const (
21-
// Signature Version 2 is being turned off (deprecated) in Amazon S3. Amazon S3 will then only accept API requests that are signed using Signature Version 4.
22-
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingAWSSDK.html#UsingAWSSDK-sig2-deprecation
23-
SignatureVersionV4 = "v4"
24-
2522
// SSEKMS config type constant to configure S3 server side encryption using KMS
2623
// https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html
2724
SSEKMS = "SSE-KMS"
@@ -32,41 +29,99 @@ const (
3229
)
3330

3431
var (
35-
supportedSignatureVersions = []string{SignatureVersionV4}
36-
supportedSSETypes = []string{SSEKMS, SSES3}
37-
errUnsupportedSignatureVersion = errors.New("unsupported signature version")
38-
errUnsupportedSSEType = errors.New("unsupported S3 SSE type")
39-
errInvalidSSEContext = errors.New("invalid S3 SSE encryption context")
32+
supportedSSETypes = []string{SSEKMS, SSES3}
33+
supportedStorageClasses = s3_service.ObjectStorageClass_Values()
34+
supportedBucketLookupTypes = thanosS3BucketLookupTypesValues()
35+
36+
errUnsupportedSSEType = errors.New("unsupported S3 SSE type")
37+
errUnsupportedStorageClass = fmt.Errorf("unsupported S3 storage class (supported values: %s)", strings.Join(supportedStorageClasses, ", "))
38+
errInvalidSSEContext = errors.New("invalid S3 SSE encryption context")
39+
errInvalidEndpointPrefix = errors.New("the endpoint must not prefixed with the bucket name")
40+
errInvalidSTSEndpoint = errors.New("sts-endpoint must be a valid url")
4041
)
4142

43+
var thanosS3BucketLookupTypes = map[string]s3.BucketLookupType{
44+
s3.AutoLookup.String(): s3.AutoLookup,
45+
s3.VirtualHostLookup.String(): s3.VirtualHostLookup,
46+
s3.PathLookup.String(): s3.PathLookup,
47+
}
48+
49+
func thanosS3BucketLookupTypesValues() (list []string) {
50+
for k := range thanosS3BucketLookupTypes {
51+
list = append(list, k)
52+
}
53+
// sort the list for consistent output in help, where it's used
54+
slices.Sort(list)
55+
return list
56+
}
57+
4258
// HTTPConfig stores the http.Transport configuration for the s3 minio client.
4359
type HTTPConfig struct {
44-
bucket_http.Config `yaml:",inline"`
60+
IdleConnTimeout time.Duration `yaml:"idle_conn_timeout" category:"advanced"`
61+
ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout" category:"advanced"`
62+
InsecureSkipVerify bool `yaml:"insecure_skip_verify" category:"advanced"`
63+
TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout" category:"advanced"`
64+
ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout" category:"advanced"`
65+
MaxIdleConns int `yaml:"max_idle_connections" category:"advanced"`
66+
MaxIdleConnsPerHost int `yaml:"max_idle_connections_per_host" category:"advanced"`
67+
MaxConnsPerHost int `yaml:"max_connections_per_host" category:"advanced"`
4568

4669
// Allow upstream callers to inject a round tripper
4770
Transport http.RoundTripper `yaml:"-"`
71+
72+
TLSConfig TLSConfig `yaml:",inline"`
73+
}
74+
75+
// TLSConfig configures the options for TLS connections.
76+
type TLSConfig struct {
77+
CAPath string `yaml:"tls_ca_path" category:"advanced"`
78+
CertPath string `yaml:"tls_cert_path" category:"advanced"`
79+
KeyPath string `yaml:"tls_key_path" category:"advanced"`
80+
ServerName string `yaml:"tls_server_name" category:"advanced"`
4881
}
4982

5083
// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix
5184
func (cfg *HTTPConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
52-
cfg.Config.RegisterFlagsWithPrefix(prefix+"s3.", f)
85+
f.DurationVar(&cfg.IdleConnTimeout, prefix+"s3.http.idle-conn-timeout", 90*time.Second, "The time an idle connection will remain idle before closing.")
86+
f.DurationVar(&cfg.ResponseHeaderTimeout, prefix+"s3.http.response-header-timeout", 2*time.Minute, "The amount of time the client will wait for a servers response headers.")
87+
f.BoolVar(&cfg.InsecureSkipVerify, prefix+"s3.http.insecure-skip-verify", false, "If the client connects to S3 via HTTPS and this option is enabled, the client will accept any certificate and hostname.")
88+
f.DurationVar(&cfg.TLSHandshakeTimeout, prefix+"s3.tls-handshake-timeout", 10*time.Second, "Maximum time to wait for a TLS handshake. 0 means no limit.")
89+
f.DurationVar(&cfg.ExpectContinueTimeout, prefix+"s3.expect-continue-timeout", 1*time.Second, "The time to wait for a server's first response headers after fully writing the request headers if the request has an Expect header. 0 to send the request body immediately.")
90+
f.IntVar(&cfg.MaxIdleConns, prefix+"s3.max-idle-connections", 100, "Maximum number of idle (keep-alive) connections across all hosts. 0 means no limit.")
91+
f.IntVar(&cfg.MaxIdleConnsPerHost, prefix+"s3.max-idle-connections-per-host", 100, "Maximum number of idle (keep-alive) connections to keep per-host. If 0, a built-in default value is used.")
92+
f.IntVar(&cfg.MaxConnsPerHost, prefix+"s3.max-connections-per-host", 0, "Maximum number of connections per host. 0 means no limit.")
93+
cfg.TLSConfig.RegisterFlagsWithPrefix(prefix, f)
94+
}
95+
96+
// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix.
97+
func (cfg *TLSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
98+
f.StringVar(&cfg.CAPath, prefix+"s3.http.tls-ca-path", "", "Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.")
99+
f.StringVar(&cfg.CertPath, prefix+"s3.http.tls-cert-path", "", "Path to the client certificate, which will be used for authenticating with the server. Also requires the key path to be configured.")
100+
f.StringVar(&cfg.KeyPath, prefix+"s3.http.tls-key-path", "", "Path to the key for the client certificate. Also requires the client certificate to be configured.")
101+
f.StringVar(&cfg.ServerName, prefix+"s3.http.tls-server-name", "", "Override the expected name on the server certificate.")
53102
}
54103

55104
// Config holds the config options for an S3 backend
56105
type Config struct {
57-
Endpoint string `yaml:"endpoint"`
58-
Region string `yaml:"region"`
59-
BucketName string `yaml:"bucket_name"`
60-
SecretAccessKey flagext.Secret `yaml:"secret_access_key"`
61-
SessionToken flagext.Secret `yaml:"session_token"`
62-
AccessKeyID string `yaml:"access_key_id"`
63-
Insecure bool `yaml:"insecure"`
64-
DisableDualstack bool `yaml:"disable_dualstack"`
65-
SignatureVersion string `yaml:"signature_version"`
66-
StorageClass string `yaml:"storage_class"`
106+
Endpoint string `yaml:"endpoint"`
107+
Region string `yaml:"region"`
108+
BucketName string `yaml:"bucket_name"`
109+
SecretAccessKey flagext.Secret `yaml:"secret_access_key"`
110+
AccessKeyID string `yaml:"access_key_id"`
111+
SessionToken flagext.Secret `yaml:"session_token"`
112+
Insecure bool `yaml:"insecure" category:"advanced"`
113+
ListObjectsVersion string `yaml:"list_objects_version" category:"advanced"`
114+
BucketLookupType s3.BucketLookupType `yaml:"bucket_lookup_type" category:"advanced"`
115+
DualstackEnabled bool `yaml:"dualstack_enabled" category:"experimental"`
116+
StorageClass string `yaml:"storage_class" category:"experimental"`
117+
NativeAWSAuthEnabled bool `yaml:"native_aws_auth_enabled" category:"experimental"`
118+
PartSize uint64 `yaml:"part_size" category:"experimental"`
119+
SendContentMd5 bool `yaml:"send_content_md5" category:"experimental"`
120+
STSEndpoint string `yaml:"sts_endpoint"`
67121

68-
SSE SSEConfig `yaml:"sse"`
69-
HTTP HTTPConfig `yaml:"http"`
122+
SSE SSEConfig `yaml:"sse"`
123+
HTTP HTTPConfig `yaml:"http"`
124+
TraceConfig TraceConfig `yaml:"trace"`
70125
}
71126

72127
// RegisterFlags registers the flags for s3 storage with the provided prefix
@@ -83,21 +138,32 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
83138
f.StringVar(&cfg.Region, prefix+"s3.region", "", "S3 region. If unset, the client will issue a S3 GetBucketLocation API call to autodetect it.")
84139
f.StringVar(&cfg.Endpoint, prefix+"s3.endpoint", "", "The S3 bucket endpoint. It could be an AWS S3 endpoint listed at https://docs.aws.amazon.com/general/latest/gr/s3.html or the address of an S3-compatible service in hostname:port format.")
85140
f.BoolVar(&cfg.Insecure, prefix+"s3.insecure", false, "If enabled, use http:// for the S3 endpoint instead of https://. This could be useful in local dev/test environments while using an S3-compatible backend storage, like Minio.")
86-
f.BoolVar(&cfg.DisableDualstack, prefix+"s3.disable-dualstack", false, "Disable forcing S3 dualstack endpoint usage.")
87-
f.StringVar(&cfg.SignatureVersion, prefix+"s3.signature-version", SignatureVersionV4, fmt.Sprintf("The signature version to use for authenticating against S3. Supported values are: %s.", strings.Join(supportedSignatureVersions, ", ")))
88-
f.StringVar(&cfg.StorageClass, prefix+"s3.storage-class", aws.StorageClassStandard, "The S3 storage class to use. Details can be found at https://aws.amazon.com/s3/storage-classes/.")
141+
f.StringVar(&cfg.ListObjectsVersion, prefix+"s3.list-objects-version", "", "Use a specific version of the S3 list object API. Supported values are v1 or v2. Default is unset.")
142+
f.StringVar(&cfg.StorageClass, prefix+"s3.storage-class", "", "The S3 storage class to use, not set by default. Details can be found at https://aws.amazon.com/s3/storage-classes/. Supported values are: "+strings.Join(supportedStorageClasses, ", "))
143+
f.BoolVar(&cfg.NativeAWSAuthEnabled, prefix+"s3.native-aws-auth-enabled", false, "If enabled, it will use the default authentication methods of the AWS SDK for go based on known environment variables and known AWS config files.")
144+
f.Uint64Var(&cfg.PartSize, prefix+"s3.part-size", 0, "The minimum file size in bytes used for multipart uploads. If 0, the value is optimally computed for each object.")
145+
f.BoolVar(&cfg.SendContentMd5, prefix+"s3.send-content-md5", false, "If enabled, a Content-MD5 header is sent with S3 Put Object requests. Consumes more resources to compute the MD5, but may improve compatibility with object storage services that do not support checksums.")
146+
f.Var(newBucketLookupTypeValue(s3.AutoLookup, &cfg.BucketLookupType), prefix+"s3.bucket-lookup-type", fmt.Sprintf("Bucket lookup style type, used to access bucket in S3-compatible service. Default is auto. Supported values are: %s.", strings.Join(supportedBucketLookupTypes, ", ")))
147+
f.BoolVar(&cfg.DualstackEnabled, prefix+"s3.dualstack-enabled", true, "When enabled, direct all AWS S3 requests to the dual-stack IPv4/IPv6 endpoint for the configured region.")
148+
f.StringVar(&cfg.STSEndpoint, prefix+"s3.sts-endpoint", "", "Accessing S3 resources using temporary, secure credentials provided by AWS Security Token Service.")
89149
cfg.SSE.RegisterFlagsWithPrefix(prefix+"s3.sse.", f)
90150
cfg.HTTP.RegisterFlagsWithPrefix(prefix, f)
151+
cfg.TraceConfig.RegisterFlagsWithPrefix(prefix+"s3.trace.", f)
91152
}
92153

93154
// Validate config and returns error on failure
94155
func (cfg *Config) Validate() error {
95-
if !util.StringsContain(supportedSignatureVersions, cfg.SignatureVersion) {
96-
return errUnsupportedSignatureVersion
156+
if cfg.Endpoint != "" {
157+
endpoint := strings.Split(cfg.Endpoint, ".")
158+
if cfg.BucketName != "" && endpoint[0] != "" && endpoint[0] == cfg.BucketName {
159+
return errInvalidEndpointPrefix
160+
}
97161
}
98-
99-
if err := aws.ValidateStorageClass(cfg.StorageClass); err != nil {
100-
return err
162+
if cfg.STSEndpoint != "" && !util.IsValidURL(cfg.STSEndpoint) {
163+
return errInvalidSTSEndpoint
164+
}
165+
if !slices.Contains(supportedStorageClasses, cfg.StorageClass) && cfg.StorageClass != "" {
166+
return errUnsupportedStorageClass
101167
}
102168

103169
return cfg.SSE.Validate()
@@ -191,3 +257,35 @@ func parseKMSEncryptionContext(data string) (map[string]string, error) {
191257
err := errors.Wrap(json.Unmarshal([]byte(data), &decoded), "unable to parse KMS encryption context")
192258
return decoded, err
193259
}
260+
261+
type TraceConfig struct {
262+
Enabled bool `yaml:"enabled" category:"advanced"`
263+
}
264+
265+
func (cfg *TraceConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
266+
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "When enabled, low-level S3 HTTP operation information is logged at the debug level.")
267+
}
268+
269+
// bucketLookupTypeValue is an adapter between s3.BucketLookupType and flag.Value.
270+
type bucketLookupTypeValue s3.BucketLookupType
271+
272+
func newBucketLookupTypeValue(value s3.BucketLookupType, p *s3.BucketLookupType) *bucketLookupTypeValue {
273+
*p = value
274+
return (*bucketLookupTypeValue)(p)
275+
}
276+
277+
func (v *bucketLookupTypeValue) String() string {
278+
if v == nil {
279+
return s3.AutoLookup.String()
280+
}
281+
return s3.BucketLookupType(*v).String()
282+
}
283+
284+
func (v *bucketLookupTypeValue) Set(s string) error {
285+
t, ok := thanosS3BucketLookupTypes[s]
286+
if !ok {
287+
return fmt.Errorf("unsupported bucket lookup type: %s", s)
288+
}
289+
*v = bucketLookupTypeValue(t)
290+
return nil
291+
}

0 commit comments

Comments
 (0)