Skip to content

Commit 62fd1c8

Browse files
DylanGuedesLashaJini
authored andcommitted
feat: Introduce new ObjectExistsWithSize API to (grafana#14268)
**What this PR does / why we need it**: Introduce a new `ObjectExistsWithSize` API to our object storage interface. This is the same as `ObjectExists` but with the object size as part of the return value. This is useful to compare the object size present.
1 parent 89fe86d commit 62fd1c8

17 files changed

+178
-43
lines changed

pkg/ingester-rf1/objstore/storage.go

+8
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ func (m *Multi) GetStoreFor(ts model.Time) (client.ObjectClient, error) {
7070
return nil, fmt.Errorf("no store found for timestamp %s", ts)
7171
}
7272

73+
func (m *Multi) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
74+
s, err := m.GetStoreFor(model.Now())
75+
if err != nil {
76+
return false, 0, err
77+
}
78+
return s.ObjectExistsWithSize(ctx, objectKey)
79+
}
80+
7381
func (m *Multi) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
7482
s, err := m.GetStoreFor(model.Now())
7583
if err != nil {

pkg/storage/chunk/client/alibaba/oss_object_client.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,27 @@ func (s *OssObjectClient) Stop() {
7373
}
7474

7575
func (s *OssObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
76+
exists, _, err := s.ObjectExistsWithSize(ctx, objectKey)
77+
return exists, err
78+
}
79+
80+
func (s *OssObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
7681
var options []oss.Option
82+
var objectSize int64
7783
err := instrument.CollectedRequest(ctx, "OSS.ObjectExists", ossRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
78-
_, requestErr := s.defaultBucket.GetObjectMeta(objectKey, options...)
79-
return requestErr
84+
headers, requestErr := s.defaultBucket.GetObjectMeta(objectKey, options...)
85+
if requestErr != nil {
86+
return requestErr
87+
}
88+
89+
objectSize, _ = strconv.ParseInt(headers.Get(oss.HTTPHeaderContentLength), 10, 64)
90+
return nil
8091
})
8192
if err != nil {
82-
return false, err
93+
return false, 0, err
8394
}
8495

85-
return true, nil
96+
return true, objectSize, nil
8697
}
8798

8899
// GetObject returns a reader and the size for the specified object key from the configured OSS bucket.

pkg/storage/chunk/client/aws/s3_storage_client.go

+19-7
Original file line numberDiff line numberDiff line change
@@ -310,37 +310,49 @@ func buckets(cfg S3Config) ([]string, error) {
310310
func (a *S3ObjectClient) Stop() {}
311311

312312
func (a *S3ObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
313+
exists, _, err := a.ObjectExistsWithSize(ctx, objectKey)
314+
return exists, err
315+
}
316+
317+
func (a *S3ObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
313318
var lastErr error
319+
var objectSize int64
314320

315321
retries := backoff.New(ctx, a.cfg.BackoffConfig)
316322
for retries.Ongoing() {
317323
if ctx.Err() != nil {
318-
return false, errors.Wrap(ctx.Err(), "ctx related error during s3 objectExists")
324+
return false, 0, errors.Wrap(ctx.Err(), "ctx related error during s3 objectExists")
319325
}
320326
lastErr = instrument.CollectedRequest(ctx, "S3.ObjectExists", s3RequestDuration, instrument.ErrorCode, func(_ context.Context) error {
321327
headObjectInput := &s3.HeadObjectInput{
322328
Bucket: aws.String(a.bucketFromKey(objectKey)),
323329
Key: aws.String(objectKey),
324330
}
325-
_, requestErr := a.S3.HeadObject(headObjectInput)
326-
return requestErr
331+
headOutput, requestErr := a.S3.HeadObject(headObjectInput)
332+
if requestErr != nil {
333+
return requestErr
334+
}
335+
if headOutput != nil && headOutput.ContentLength != nil {
336+
objectSize = *headOutput.ContentLength
337+
}
338+
return nil
327339
})
328340
if lastErr == nil {
329-
return true, nil
341+
return true, 0, nil
330342
}
331343

332344
if a.IsObjectNotFoundErr(lastErr) {
333-
return false, lastErr
345+
return false, 0, lastErr
334346
}
335347

336348
retries.Wait()
337349
}
338350

339351
if lastErr != nil {
340-
return false, lastErr
352+
return false, 0, lastErr
341353
}
342354

343-
return true, nil
355+
return true, objectSize, nil
344356
}
345357

346358
// DeleteObject deletes the specified objectKey from the appropriate S3 bucket

pkg/storage/chunk/client/aws/s3_storage_client_test.go

+18
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,15 @@ func Test_RetryLogic(t *testing.T) {
334334
return err
335335
},
336336
},
337+
{
338+
"object exists with size with retries",
339+
3,
340+
true,
341+
func(c *S3ObjectClient) error {
342+
_, _, err := c.ObjectExistsWithSize(context.Background(), "foo")
343+
return err
344+
},
345+
},
337346
{
338347
"object doesn't exist with retries",
339348
3,
@@ -343,6 +352,15 @@ func Test_RetryLogic(t *testing.T) {
343352
return err
344353
},
345354
},
355+
{
356+
"object doesn't exist (with size) with retries",
357+
3,
358+
false,
359+
func(c *S3ObjectClient) error {
360+
_, _, err := c.ObjectExistsWithSize(context.Background(), "foo")
361+
return err
362+
},
363+
},
346364
} {
347365
t.Run(tc.name, func(t *testing.T) {
348366
callCount := atomic.NewInt32(0)

pkg/storage/chunk/client/azure/blob_storage_client.go

+19-4
Original file line numberDiff line numberDiff line change
@@ -220,20 +220,35 @@ func NewBlobStorage(cfg *BlobStorageConfig, metrics BlobStorageMetrics, hedgingC
220220
func (b *BlobStorage) Stop() {}
221221

222222
func (b *BlobStorage) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
223+
exists, _, err := b.ObjectExistsWithSize(ctx, objectKey)
224+
return exists, err
225+
}
226+
227+
func (b *BlobStorage) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
228+
var objectSize int64
223229
err := loki_instrument.TimeRequest(ctx, "azure.ObjectExists", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
224230
blockBlobURL, err := b.getBlobURL(objectKey, false)
225231
if err != nil {
226232
return err
227233
}
228234

229-
_, err = blockBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
230-
return err
235+
response, err := blockBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
236+
if err != nil {
237+
return err
238+
}
239+
if response != nil {
240+
rawResponse := response.Response()
241+
if rawResponse != nil {
242+
objectSize = rawResponse.ContentLength
243+
}
244+
}
245+
return nil
231246
})
232247
if err != nil {
233-
return false, err
248+
return false, 0, err
234249
}
235250

236-
return true, nil
251+
return true, objectSize, nil
237252
}
238253

239254
// GetObject returns a reader and the size for the specified object key.

pkg/storage/chunk/client/baidubce/bos_storage_client.go

+16-5
Original file line numberDiff line numberDiff line change
@@ -91,16 +91,27 @@ func (b *BOSObjectStorage) PutObject(ctx context.Context, objectKey string, obje
9191
}
9292

9393
func (b *BOSObjectStorage) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
94+
exists, _, err := b.ObjectExistsWithSize(ctx, objectKey)
95+
return exists, err
96+
}
97+
98+
func (b *BOSObjectStorage) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
99+
var objectSize int64
94100
err := instrument.CollectedRequest(ctx, "BOS.ObjectExists", bosRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
95-
var requestErr error
96-
_, requestErr = b.client.GetObjectMeta(b.cfg.BucketName, objectKey)
97-
return requestErr
101+
metaResult, requestErr := b.client.GetObjectMeta(b.cfg.BucketName, objectKey)
102+
if requestErr != nil {
103+
return requestErr
104+
}
105+
if metaResult != nil {
106+
objectSize = metaResult.ContentLength
107+
}
108+
return nil
98109
})
99110
if err != nil {
100-
return false, err
111+
return false, 0, err
101112
}
102113

103-
return true, nil
114+
return true, objectSize, nil
104115
}
105116

106117
func (b *BOSObjectStorage) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {

pkg/storage/chunk/client/congestion/controller.go

+7
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ func (a *AIMDController) ObjectExists(ctx context.Context, objectKey string) (bo
145145
return a.inner.ObjectExists(ctx, objectKey)
146146
}
147147

148+
func (a *AIMDController) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
149+
return a.inner.ObjectExistsWithSize(ctx, objectKey)
150+
}
151+
148152
func (a *AIMDController) DeleteObject(ctx context.Context, objectKey string) error {
149153
return a.inner.DeleteObject(ctx, objectKey)
150154
}
@@ -212,6 +216,9 @@ func NewNoopController(Config) *NoopController {
212216
return &NoopController{}
213217
}
214218

219+
func (n *NoopController) ObjectExistsWithSize(context.Context, string) (bool, int64, error) {
220+
return true, 0, nil
221+
}
215222
func (n *NoopController) ObjectExists(context.Context, string) (bool, error) { return true, nil }
216223
func (n *NoopController) PutObject(context.Context, string, io.Reader) error { return nil }
217224
func (n *NoopController) GetObject(context.Context, string) (io.ReadCloser, int64, error) {

pkg/storage/chunk/client/congestion/controller_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,10 @@ func (m *mockObjectClient) ObjectExists(context.Context, string) (bool, error) {
267267
panic("not implemented")
268268
}
269269

270+
func (m *mockObjectClient) ObjectExistsWithSize(context.Context, string) (bool, int64, error) {
271+
panic("not implemented")
272+
}
273+
270274
func (m *mockObjectClient) List(context.Context, string, string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
271275
panic("not implemented")
272276
}

pkg/storage/chunk/client/gcp/gcs_object_client.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,20 @@ func (s *GCSObjectClient) Stop() {
127127
}
128128

129129
func (s *GCSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
130-
_, err := s.getsBuckets.Object(objectKey).Attrs(ctx)
130+
exists, _, err := s.ObjectExistsWithSize(ctx, objectKey)
131+
return exists, err
132+
}
133+
134+
func (s *GCSObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
135+
attrs, err := s.getsBuckets.Object(objectKey).Attrs(ctx)
131136
if err != nil {
132-
return false, err
137+
return false, 0, err
133138
}
134139

135-
return true, nil
140+
if attrs != nil {
141+
return true, attrs.Size, nil
142+
}
143+
return true, 0, nil
136144
}
137145

138146
// GetObject returns a reader and the size for the specified object key from the configured GCS bucket.

pkg/storage/chunk/client/ibmcloud/cos_object_client.go

+16-5
Original file line numberDiff line numberDiff line change
@@ -320,20 +320,31 @@ func (c *COSObjectClient) DeleteObject(ctx context.Context, objectKey string) er
320320
}
321321

322322
func (c *COSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
323+
exists, _, err := c.ObjectExistsWithSize(ctx, objectKey)
324+
return exists, err
325+
}
326+
327+
func (c *COSObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
323328
bucket := c.bucketFromKey(objectKey)
329+
var objectSize int64
324330
err := instrument.CollectedRequest(ctx, "COS.GetObject", cosRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
325-
var requestErr error
326-
_, requestErr = c.hedgedCOS.HeadObject(&cos.HeadObjectInput{
331+
headOutput, requestErr := c.hedgedCOS.HeadObject(&cos.HeadObjectInput{
327332
Bucket: ibm.String(bucket),
328333
Key: ibm.String(objectKey),
329334
})
330-
return requestErr
335+
if requestErr != nil {
336+
return requestErr
337+
}
338+
if headOutput != nil && headOutput.ContentLength != nil {
339+
objectSize = *headOutput.ContentLength
340+
}
341+
return nil
331342
})
332343
if err != nil {
333-
return false, err
344+
return false, 0, err
334345
}
335346

336-
return true, nil
347+
return true, objectSize, nil
337348
}
338349

339350
// GetObject returns a reader and the size for the specified object key from the configured S3 bucket.

pkg/storage/chunk/client/local/fs_object_client.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,19 @@ func NewFSObjectClient(cfg FSConfig) (*FSObjectClient, error) {
6767
// Stop implements ObjectClient
6868
func (FSObjectClient) Stop() {}
6969

70-
func (f *FSObjectClient) ObjectExists(_ context.Context, objectKey string) (bool, error) {
70+
func (f *FSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
71+
exists, _, err := f.ObjectExistsWithSize(ctx, objectKey)
72+
return exists, err
73+
}
74+
75+
func (f *FSObjectClient) ObjectExistsWithSize(_ context.Context, objectKey string) (bool, int64, error) {
7176
fullPath := filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey))
72-
_, err := os.Lstat(fullPath)
77+
fi, err := os.Lstat(fullPath)
7378
if err != nil {
74-
return false, err
79+
return false, 0, err
7580
}
7681

77-
return true, nil
82+
return true, fi.Size(), nil
7883
}
7984

8085
// GetObject from the store

pkg/storage/chunk/client/local/fs_object_client_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ func TestFSObjectClient_List_and_ObjectExists(t *testing.T) {
156156
ok, err := bucketClient.ObjectExists(context.Background(), "outer-file2")
157157
require.NoError(t, err)
158158
require.True(t, ok)
159+
160+
ok, objectSize, err := bucketClient.ObjectExistsWithSize(context.Background(), "outer-file2")
161+
require.NoError(t, err)
162+
require.True(t, ok)
163+
require.EqualValues(t, len("outer-file2"), objectSize)
159164
}
160165

161166
func TestFSObjectClient_DeleteObject(t *testing.T) {

pkg/storage/chunk/client/object_client.go

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
// ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/...)
2020
type ObjectClient interface {
2121
ObjectExists(ctx context.Context, objectKey string) (bool, error)
22+
ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error)
2223

2324
PutObject(ctx context.Context, objectKey string, object io.Reader) error
2425
// NOTE: The consumer of GetObject should always call the Close method when it is done reading which otherwise could cause a resource leak.

pkg/storage/chunk/client/openstack/swift_object_client.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,17 @@ func (s *SwiftObjectClient) Stop() {
125125
}
126126

127127
func (s *SwiftObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
128-
_, _, err := s.hedgingConn.Object(ctx, s.cfg.Config.ContainerName, objectKey)
128+
exists, _, err := s.ObjectExistsWithSize(ctx, objectKey)
129+
return exists, err
130+
}
131+
132+
func (s *SwiftObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
133+
info, _, err := s.hedgingConn.Object(ctx, s.cfg.Config.ContainerName, objectKey)
129134
if err != nil {
130-
return false, err
135+
return false, 0, err
131136
}
132137

133-
return true, nil
138+
return true, info.Bytes, nil
134139
}
135140

136141
// GetObject returns a reader and the size for the specified object key from the configured swift container.

pkg/storage/chunk/client/prefixed_object_client.go

+4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ func (p PrefixedObjectClient) ObjectExists(ctx context.Context, objectKey string
2323
return p.downstreamClient.ObjectExists(ctx, p.prefix+objectKey)
2424
}
2525

26+
func (p PrefixedObjectClient) ObjectExistsWithSize(ctx context.Context, objectKey string) (bool, int64, error) {
27+
return p.downstreamClient.ObjectExistsWithSize(ctx, p.prefix+objectKey)
28+
}
29+
2630
func (p PrefixedObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
2731
return p.downstreamClient.GetObject(ctx, p.prefix+objectKey)
2832
}

0 commit comments

Comments
 (0)