Skip to content

Commit 9e87001

Browse files
committed
Merge branch 'main' into gcs-disable-retires
2 parents b54d9ea + cfdd0e5 commit 9e87001

21 files changed

+112
-98
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
1313
- [#145](https://github.com/thanos-io/objstore/pull/145) Include content length in the response of Get and GetRange.
1414

1515
### Fixed
16+
- [#153](https://github.com/thanos-io/objstore/pull/153) Metrics: Fix `objstore_bucket_operation_duration_seconds_*` for `get` and `get_range` operations.
1617
- [#117](https://github.com/thanos-io/objstore/pull/117) Metrics: Fix `objstore_bucket_operation_failures_total` incorrectly incremented if context is cancelled while reading object contents.
1718
- [#115](https://github.com/thanos-io/objstore/pull/115) GCS: Fix creation of bucket with GRPC connections. Also update storage client to `v1.40.0`.
1819
- [#102](https://github.com/thanos-io/objstore/pull/102) Azure: bump azblob sdk to get concurrency fixes.
@@ -53,6 +54,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
5354
- [#128](https://github.com/thanos-io/objstore/pull/128) GCS: Add support for `ChunkSize` for writer.
5455
- [#130](https://github.com/thanos-io/objstore/pull/130) feat: Decouple creating bucket metrics from instrumenting the bucket
5556
- [#147](https://github.com/thanos-io/objstore/pull/147) feat: Add MaxRetries config to cos, gcs and obs.
57+
- [#150](https://github.com/thanos-io/objstore/pull/150) Add support for roundtripper wrapper.
5658

5759
### Changed
5860
- [#38](https://github.com/thanos-io/objstore/pull/38) *: Upgrade minio-go version to `v7.0.45`.

client/factory.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type BucketConfig struct {
5050

5151
// NewBucket initializes and returns new object storage clients.
5252
// NOTE: confContentYaml can contain secrets.
53-
func NewBucket(logger log.Logger, confContentYaml []byte, component string, rt http.RoundTripper) (objstore.Bucket, error) {
53+
func NewBucket(logger log.Logger, confContentYaml []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (objstore.Bucket, error) {
5454
level.Info(logger).Log("msg", "loading bucket configuration")
5555
bucketConf := &BucketConfig{}
5656
if err := yaml.UnmarshalStrict(confContentYaml, bucketConf); err != nil {
@@ -65,23 +65,23 @@ func NewBucket(logger log.Logger, confContentYaml []byte, component string, rt h
6565
var bucket objstore.Bucket
6666
switch strings.ToUpper(string(bucketConf.Type)) {
6767
case string(GCS):
68-
bucket, err = gcs.NewBucket(context.Background(), logger, config, component, rt)
68+
bucket, err = gcs.NewBucket(context.Background(), logger, config, component, wrapRoundtripper)
6969
case string(S3):
70-
bucket, err = s3.NewBucket(logger, config, component, rt)
70+
bucket, err = s3.NewBucket(logger, config, component, wrapRoundtripper)
7171
case string(AZURE):
72-
bucket, err = azure.NewBucket(logger, config, component, rt)
72+
bucket, err = azure.NewBucket(logger, config, component, wrapRoundtripper)
7373
case string(SWIFT):
74-
bucket, err = swift.NewContainer(logger, config, rt)
74+
bucket, err = swift.NewContainer(logger, config, wrapRoundtripper)
7575
case string(COS):
76-
bucket, err = cos.NewBucket(logger, config, component, rt)
76+
bucket, err = cos.NewBucket(logger, config, component, wrapRoundtripper)
7777
case string(ALIYUNOSS):
78-
bucket, err = oss.NewBucket(logger, config, component, rt)
78+
bucket, err = oss.NewBucket(logger, config, component, wrapRoundtripper)
7979
case string(FILESYSTEM):
8080
bucket, err = filesystem.NewBucketFromConfig(config)
8181
case string(BOS):
8282
bucket, err = bos.NewBucket(logger, config, component)
8383
case string(OCI):
84-
bucket, err = oci.NewBucket(logger, config, rt)
84+
bucket, err = oci.NewBucket(logger, config, wrapRoundtripper)
8585
case string(OBS):
8686
bucket, err = obs.NewBucket(logger, config)
8787
default:

errutil/rt_error.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
package errutil
22

3-
import "net/http"
3+
import (
4+
"net/http"
5+
6+
"github.com/pkg/errors"
7+
)
8+
9+
var rtErr = errors.New("RoundTripper error")
10+
11+
func IsMockedError(err error) bool {
12+
return errors.Is(err, rtErr)
13+
}
414

515
// ErrorRoundTripper is a custom RoundTripper that always returns an error.
616
type ErrorRoundTripper struct {
@@ -10,3 +20,7 @@ type ErrorRoundTripper struct {
1020
func (ert *ErrorRoundTripper) RoundTrip(*http.Request) (*http.Response, error) {
1121
return nil, ert.Err
1222
}
23+
24+
func WrapWithErrRoundtripper(rt http.RoundTripper) http.RoundTripper {
25+
return &ErrorRoundTripper{Err: rtErr}
26+
}

objstore.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err
573573
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
574574
b.metrics.opsFailures.WithLabelValues(op).Inc()
575575
}
576-
b.metrics.opsDuration.WithLabelValues(op).Observe(float64(time.Since(start)))
576+
b.metrics.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
577577
return nil, err
578578
}
579579
return newTimingReader(
@@ -600,7 +600,7 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in
600600
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
601601
b.metrics.opsFailures.WithLabelValues(op).Inc()
602602
}
603-
b.metrics.opsDuration.WithLabelValues(op).Observe(float64(time.Since(start)))
603+
b.metrics.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
604604
return nil, err
605605
}
606606
return newTimingReader(

providers/azure/azure.go

+4-7
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ type Bucket struct {
146146
}
147147

148148
// NewBucket returns a new Bucket using the provided Azure config.
149-
func NewBucket(logger log.Logger, azureConfig []byte, component string, rt http.RoundTripper) (*Bucket, error) {
149+
func NewBucket(logger log.Logger, azureConfig []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
150150
level.Debug(logger).Log("msg", "creating new Azure bucket connection", "component", component)
151151
conf, err := parseConfig(azureConfig)
152152
if err != nil {
@@ -155,19 +155,16 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string, rt http.
155155
if conf.MSIResource != "" {
156156
level.Warn(logger).Log("msg", "The field msi_resource has been deprecated and should no longer be set")
157157
}
158-
return NewBucketWithConfig(logger, conf, component, rt)
158+
return NewBucketWithConfig(logger, conf, component, wrapRoundtripper)
159159
}
160160

161161
// NewBucketWithConfig returns a new Bucket using the provided Azure config struct.
162-
func NewBucketWithConfig(logger log.Logger, conf Config, component string, rt http.RoundTripper) (*Bucket, error) {
163-
if rt != nil {
164-
conf.HTTPConfig.Transport = rt
165-
}
162+
func NewBucketWithConfig(logger log.Logger, conf Config, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
166163
if err := conf.validate(); err != nil {
167164
return nil, err
168165
}
169166

170-
containerClient, err := getContainerClient(conf)
167+
containerClient, err := getContainerClient(conf, wrapRoundtripper)
171168
if err != nil {
172169
return nil, err
173170
}

providers/azure/azure_test.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99

1010
"github.com/efficientgo/core/testutil"
1111
"github.com/go-kit/log"
12-
"github.com/pkg/errors"
1312

1413
"github.com/thanos-io/objstore/errutil"
1514
"github.com/thanos-io/objstore/exthttp"
@@ -230,11 +229,9 @@ func TestNewBucketWithErrorRoundTripper(t *testing.T) {
230229
cfg, err := parseConfig(validConfig)
231230
testutil.Ok(t, err)
232231

233-
rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}
234-
235-
_, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", rt)
232+
_, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", errutil.WrapWithErrRoundtripper)
236233

237234
// We expect an error from the RoundTripper
238235
testutil.NotOk(t, err)
239-
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
236+
testutil.Assert(t, errutil.IsMockedError(err), "Expected RoundTripper error, got: %v", err)
240237
}

providers/azure/helpers.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
// DirDelim is the delimiter used to model a directory structure in an object store bucket.
2020
const DirDelim = "/"
2121

22-
func getContainerClient(conf Config) (*container.Client, error) {
22+
func getContainerClient(conf Config, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*container.Client, error) {
2323
var rt http.RoundTripper
2424
rt, err := exthttp.DefaultTransport(conf.HTTPConfig)
2525
if err != nil {
@@ -28,6 +28,9 @@ func getContainerClient(conf Config) (*container.Client, error) {
2828
if conf.HTTPConfig.Transport != nil {
2929
rt = conf.HTTPConfig.Transport
3030
}
31+
if wrapRoundtripper != nil {
32+
rt = wrapRoundtripper(rt)
33+
}
3134
opt := &container.ClientOptions{
3235
ClientOptions: azcore.ClientOptions{
3336
Retry: policy.RetryOptions{

providers/bos/bos.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func parseConfig(conf []byte) (Config, error) {
6666

6767
// NewBucket new bos bucket.
6868
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
69-
// TODO(https://github.com/thanos-io/objstore/pull/140): Add support for custom roundtripper.
69+
// TODO(https://github.com/thanos-io/objstore/pull/150): Add support for roundtripper wrapper.
7070
if logger == nil {
7171
logger = log.NewNopLogger()
7272
}

providers/cos/cos.go

+11-8
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func parseConfig(conf []byte) (Config, error) {
9696
}
9797

9898
// NewBucket returns a new Bucket using the provided cos configuration.
99-
func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
99+
func NewBucket(logger log.Logger, conf []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
100100
if logger == nil {
101101
logger = log.NewNopLogger()
102102
}
@@ -105,11 +105,11 @@ func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTr
105105
if err != nil {
106106
return nil, errors.Wrap(err, "parsing cos configuration")
107107
}
108-
return NewBucketWithConfig(logger, config, component, rt)
108+
return NewBucketWithConfig(logger, config, component, wrapRoundtripper)
109109
}
110110

111111
// NewBucketWithConfig returns a new Bucket using the provided cos config values.
112-
func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) {
112+
func NewBucketWithConfig(logger log.Logger, config Config, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
113113
if err := config.validate(); err != nil {
114114
return nil, errors.Wrap(err, "validate cos configuration")
115115
}
@@ -128,19 +128,22 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string, rt
128128
}
129129
}
130130
b := &cos.BaseURL{BucketURL: bucketURL}
131-
var tpt http.RoundTripper
132-
tpt, err = exthttp.DefaultTransport(config.HTTPConfig)
131+
var rt http.RoundTripper
132+
rt, err = exthttp.DefaultTransport(config.HTTPConfig)
133133
if err != nil {
134134
return nil, err
135135
}
136-
if rt != nil {
137-
tpt = rt
136+
if config.HTTPConfig.Transport != nil {
137+
rt = config.HTTPConfig.Transport
138+
}
139+
if wrapRoundtripper != nil {
140+
rt = wrapRoundtripper(rt)
138141
}
139142
client := cos.NewClient(b, &http.Client{
140143
Transport: &cos.AuthorizationTransport{
141144
SecretID: config.SecretId,
142145
SecretKey: config.SecretKey,
143-
Transport: tpt,
146+
Transport: rt,
144147
},
145148
})
146149

providers/cos/cos_test.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010

1111
"github.com/efficientgo/core/testutil"
1212
"github.com/go-kit/log"
13-
"github.com/pkg/errors"
1413
"github.com/prometheus/common/model"
1514

1615
"github.com/thanos-io/objstore/errutil"
@@ -150,12 +149,11 @@ func TestNewBucketWithErrorRoundTripper(t *testing.T) {
150149
SecretId: "sid",
151150
SecretKey: "skey",
152151
}
153-
rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}
154152

155-
bkt, err := NewBucketWithConfig(log.NewNopLogger(), config, "test", rt)
153+
bkt, err := NewBucketWithConfig(log.NewNopLogger(), config, "test", errutil.WrapWithErrRoundtripper)
156154
testutil.Ok(t, err)
157155
_, err = bkt.Get(context.Background(), "Test")
158156
// We expect an error from the RoundTripper
159157
testutil.NotOk(t, err)
160-
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
158+
testutil.Assert(t, errutil.IsMockedError(err), "Expected RoundTripper error, got: %v", err)
161159
}

providers/gcs/gcs.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -82,22 +82,20 @@ func parseConfig(conf []byte) (Config, error) {
8282
}
8383

8484
// NewBucket returns a new Bucket against the given bucket handle.
85-
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
85+
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
8686
config, err := parseConfig(conf)
8787
if err != nil {
8888
return nil, err
8989
}
90-
return NewBucketWithConfig(ctx, logger, config, component, rt)
90+
return NewBucketWithConfig(ctx, logger, config, component, wrapRoundtripper)
9191
}
9292

9393
// NewBucketWithConfig returns a new Bucket with gcs Config struct.
94-
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string, rt http.RoundTripper) (*Bucket, error) {
94+
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
9595
if gc.Bucket == "" {
9696
return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
9797
}
98-
if rt != nil {
99-
gc.HTTPConfig.Transport = rt
100-
}
98+
10199
var opts []option.ClientOption
102100

103101
// If ServiceAccount is provided, use them in GCS client, otherwise fallback to Google default logic.
@@ -117,7 +115,7 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp
117115

118116
if !gc.UseGRPC {
119117
var err error
120-
opts, err = appendHttpOptions(gc, opts)
118+
opts, err = appendHttpOptions(gc, opts, wrapRoundtripper)
121119
if err != nil {
122120
return nil, err
123121
}
@@ -126,7 +124,7 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp
126124
return newBucket(ctx, logger, gc, opts)
127125
}
128126

129-
func appendHttpOptions(gc Config, opts []option.ClientOption) ([]option.ClientOption, error) {
127+
func appendHttpOptions(gc Config, opts []option.ClientOption, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) ([]option.ClientOption, error) {
130128
// Check if a roundtripper has been set in the config
131129
// otherwise build the default transport.
132130
var rt http.RoundTripper
@@ -137,6 +135,9 @@ func appendHttpOptions(gc Config, opts []option.ClientOption) ([]option.ClientOp
137135
if gc.HTTPConfig.Transport != nil {
138136
rt = gc.HTTPConfig.Transport
139137
}
138+
if wrapRoundtripper != nil {
139+
rt = wrapRoundtripper(rt)
140+
}
140141

141142
// GCS uses some defaults when "options.WithHTTPClient" is not used that are important when we call
142143
// htransport.NewTransport namely the scopes that are then used for OAth authentication. So to build our own

providers/gcs/gcs_test.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/efficientgo/core/testutil"
1616
"github.com/fullstorydev/emulators/storage/gcsemu"
1717
"github.com/go-kit/log"
18-
"github.com/pkg/errors"
1918
"github.com/prometheus/common/model"
2019
"github.com/thanos-io/objstore/errutil"
2120
"google.golang.org/api/option"
@@ -161,7 +160,6 @@ http_config:
161160
}
162161

163162
func TestNewBucketWithErrorRoundTripper(t *testing.T) {
164-
rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}
165163
cfg := Config{
166164
Bucket: "test-bucket",
167165
ServiceAccount: "",
@@ -174,9 +172,9 @@ func TestNewBucketWithErrorRoundTripper(t *testing.T) {
174172
err = os.Setenv("STORAGE_EMULATOR_HOST", svr.Addr)
175173
testutil.Ok(t, err)
176174

177-
bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket", rt)
175+
bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket", errutil.WrapWithErrRoundtripper)
178176
testutil.Ok(t, err)
179177
_, err = bkt.Get(context.Background(), "test-bucket")
180178
testutil.NotOk(t, err)
181-
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
179+
testutil.Assert(t, errutil.IsMockedError(err), "Expected RoundTripper error, got: %v", err)
182180
}

providers/obs/obs.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ type Bucket struct {
7676
}
7777

7878
func NewBucket(logger log.Logger, conf []byte) (*Bucket, error) {
79-
// TODO(https://github.com/thanos-io/objstore/pull/140): Add support for custom roundtripper.
79+
// TODO(https://github.com/thanos-io/objstore/pull/150): Add support for roundtripper wrapper.
8080
config, err := parseConfig(conf)
8181
if err != nil {
8282
return nil, errors.Wrap(err, "parsing cos configuration")

providers/oci/oci.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ func (b *Bucket) deleteBucket(ctx context.Context) (err error) {
298298
}
299299

300300
// NewBucket returns a new Bucket using the provided oci config values.
301-
func NewBucket(logger log.Logger, ociConfig []byte, rt http.RoundTripper) (*Bucket, error) {
301+
func NewBucket(logger log.Logger, ociConfig []byte, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
302302
level.Debug(logger).Log("msg", "creating new oci bucket connection")
303303
var config = DefaultConfig
304304
var configurationProvider common.ConfigurationProvider
@@ -344,13 +344,16 @@ func NewBucket(logger log.Logger, ociConfig []byte, rt http.RoundTripper) (*Buck
344344
if err != nil {
345345
return nil, errors.Wrapf(err, "unable to create ObjectStorage client with the given oci configurations")
346346
}
347-
348-
config.HTTPConfig.Transport = CustomTransport(config)
349-
if rt != nil {
350-
config.HTTPConfig.Transport = rt
347+
var rt http.RoundTripper
348+
rt = CustomTransport(config)
349+
if config.HTTPConfig.Transport != nil {
350+
rt = config.HTTPConfig.Transport
351+
}
352+
if wrapRoundtripper != nil {
353+
rt = wrapRoundtripper(rt)
351354
}
352355
httpClient := http.Client{
353-
Transport: config.HTTPConfig.Transport,
356+
Transport: rt,
354357
Timeout: config.HTTPConfig.ClientTimeout,
355358
}
356359
client.HTTPClient = &httpClient

providers/oci/oci_test.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55

66
"github.com/efficientgo/core/testutil"
77
"github.com/go-kit/log"
8-
"github.com/pkg/errors"
98
"github.com/thanos-io/objstore/errutil"
109
"gopkg.in/yaml.v2"
1110
)
@@ -38,10 +37,8 @@ G6aFKaqQfOXKCyWoUiVknQJAXrlgySFci/2ueKlIE1QqIiLSZ8V8OlpFLRnb1pzI
3837
ociConfig, err := yaml.Marshal(config)
3938
testutil.Ok(t, err)
4039

41-
rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}
42-
43-
_, err = NewBucket(log.NewNopLogger(), ociConfig, rt)
40+
_, err = NewBucket(log.NewNopLogger(), ociConfig, errutil.WrapWithErrRoundtripper)
4441
// We expect an error from the RoundTripper
4542
testutil.NotOk(t, err)
46-
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
43+
testutil.Assert(t, errutil.IsMockedError(err), "Expected RoundTripper error, got: %v", err)
4744
}

0 commit comments

Comments
 (0)