Skip to content

Commit 4bc99d7

Browse files
authored
Merge pull request #1 from ashwanthgoli/iter-object-attributes-optional
Add support for iter with optional attributes
2 parents 79ce462 + 5581463 commit 4bc99d7

File tree

18 files changed

+492
-127
lines changed

18 files changed

+492
-127
lines changed

inmem.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (b *InMemBucket) Objects() map[string][]byte {
5050

5151
// Iter calls f for each entry in the given directory. The argument to f is the full
5252
// object name including the prefix of the inspected directory.
53-
func (b *InMemBucket) Iter(ctx context.Context, dir string, f func(name string, attrs ObjectAttributes) error, options ...IterOption) error {
53+
func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error, options ...IterOption) error {
5454
unique := map[string]struct{}{}
5555
params := ApplyIterOptions(options...)
5656

@@ -99,13 +99,27 @@ func (b *InMemBucket) Iter(ctx context.Context, dir string, f func(name string,
9999
})
100100

101101
for _, k := range keys {
102-
if err := f(k, EmptyObjectAttributes); err != nil {
102+
if err := f(k); err != nil {
103103
return err
104104
}
105105
}
106106
return nil
107107
}
108108

109+
func (i *InMemBucket) SupportedIterOptions() []IterOptionType {
110+
return []IterOptionType{Recursive}
111+
}
112+
113+
func (b *InMemBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error {
114+
if err := ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil {
115+
return err
116+
}
117+
118+
return b.Iter(ctx, dir, func(name string) error {
119+
return f(IterObjectAttributes{Name: name})
120+
}, options...)
121+
}
122+
109123
// Get returns a reader for the given object name.
110124
func (b *InMemBucket) Get(_ context.Context, name string) (io.ReadCloser, error) {
111125
if name == "" {

objstore.go

+93-19
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ package objstore
66
import (
77
"bytes"
88
"context"
9+
"fmt"
910
"io"
1011
"io/fs"
1112
"os"
1213
"path"
1314
"path/filepath"
15+
"slices"
1416
"strings"
1517
"sync"
1618
"time"
@@ -34,8 +36,6 @@ const (
3436
OpAttributes = "attributes"
3537
)
3638

37-
var EmptyObjectAttributes = ObjectAttributes{}
38-
3939
// Bucket provides read and write access to an object storage bucket.
4040
// NOTE: We assume strong consistency for write-read flow.
4141
type Bucket interface {
@@ -70,12 +70,21 @@ type InstrumentedBucket interface {
7070

7171
// BucketReader provides read access to an object storage bucket.
7272
type BucketReader interface {
73-
// Iter calls f for each entry in the given directory (not recursive.). The first argument to f is the full
74-
// object name including the prefix of the inspected directory. The second argument are the object attributes
75-
// returned by the underlying provider. Attributes can be requested using various IterOption's.
76-
//
73+
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
74+
// object name including the prefix of the inspected directory.
75+
7776
// Entries are passed to function in sorted order.
78-
Iter(ctx context.Context, dir string, f func(name string, attrs ObjectAttributes) error, options ...IterOption) error
77+
Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error
78+
79+
// IterWithAttributes calls f for each entry in the given directory similar to Iter.
80+
// In addition to Name, it also includes requested object attributes in the argument to f.
81+
//
82+
// Attributes can be requested using IterOption.
83+
// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
84+
IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error
85+
86+
// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
87+
SupportedIterOptions() []IterOptionType
7988

8089
// Get returns a reader for the given object name.
8190
Get(ctx context.Context, name string) (io.ReadCloser, error)
@@ -105,32 +114,66 @@ type InstrumentedBucketReader interface {
105114
ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader
106115
}
107116

117+
var ErrOptionNotSupported = errors.New("iter option is not supported")
118+
119+
// IterOptionType is used for type-safe option support checking
120+
type IterOptionType int
121+
122+
const (
123+
Recursive IterOptionType = iota
124+
UpdatedAt
125+
)
126+
108127
// IterOption configures the provided params.
109-
type IterOption func(params *IterParams)
128+
type IterOption struct {
129+
Type IterOptionType
130+
Apply func(params *IterParams)
131+
}
110132

111133
// WithRecursiveIter is an option that can be applied to Iter() to recursively list objects
112134
// in the bucket.
113-
func WithRecursiveIter(params *IterParams) {
114-
params.Recursive = true
135+
func WithRecursiveIter() IterOption {
136+
return IterOption{
137+
Type: Recursive,
138+
Apply: func(params *IterParams) {
139+
params.Recursive = true
140+
},
141+
}
115142
}
116143

117144
// WithUpdatedAt is an option that can be applied to Iter() to
118-
// return the updated time attribute of each object.
119-
// This option is currently supported for the GCS and Filesystem providers.
120-
func WithUpdatedAt(params *IterParams) {
121-
params.WithUpdatedAt = true
145+
// include the last modified time in the attributes.
146+
// NB: Prefixes may not report last modified time.
147+
// This option is currently supported for the azure, aws, bos, gcs and filesystem providers.
148+
func WithUpdatedAt() IterOption {
149+
return IterOption{
150+
Type: UpdatedAt,
151+
Apply: func(params *IterParams) {
152+
params.LastModified = true
153+
},
154+
}
122155
}
123156

124157
// IterParams holds the Iter() parameters and is used by objstore clients implementations.
125158
type IterParams struct {
126-
Recursive bool
127-
WithUpdatedAt bool
159+
Recursive bool
160+
LastModified bool
161+
}
162+
163+
func ValidateIterOptions(supportedOptions []IterOptionType, options ...IterOption) error {
164+
for _, opt := range options {
165+
if !slices.Contains(supportedOptions, opt.Type) {
166+
return fmt.Errorf("%w: %v", ErrOptionNotSupported, opt.Type)
167+
}
168+
}
169+
170+
return nil
128171
}
129172

130173
func ApplyIterOptions(options ...IterOption) IterParams {
131174
out := IterParams{}
132175
for _, opt := range options {
133-
opt(&out)
176+
opt.Apply(&out)
134177
}
135178
return out
136179
}
@@ -201,6 +244,20 @@ type ObjectAttributes struct {
201244
LastModified time.Time `json:"last_modified"`
202245
}
203246

247+
type IterObjectAttributes struct {
248+
Name string
249+
lastModified time.Time
250+
}
251+
252+
func (i *IterObjectAttributes) SetLastModified(t time.Time) {
253+
i.lastModified = t
254+
}
255+
256+
// LastModified returns the timestamp the object was last modified. Returns false if the timestamp is not available.
257+
func (i *IterObjectAttributes) LastModified() (time.Time, bool) {
258+
return i.lastModified, !i.lastModified.IsZero()
259+
}
260+
204261
// TryToGetSize tries to get upfront size from reader.
205262
// Some implementations may return only size of unread data in the reader, so it's best to call this method before
206263
// doing any reading.
@@ -359,7 +416,7 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi
359416
var downloadedFiles []string
360417
var m sync.Mutex
361418

362-
err := bkt.Iter(ctx, src, func(name string, _ ObjectAttributes) error {
419+
err := bkt.Iter(ctx, src, func(name string) error {
363420
g.Go(func() error {
364421
dst := filepath.Join(dst, filepath.Base(name))
365422
if strings.HasSuffix(name, DirDelim) {
@@ -495,7 +552,7 @@ func (b *metricBucket) ReaderWithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket
495552
return b.WithExpectedErrs(fn)
496553
}
497554

498-
func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string, _ ObjectAttributes) error, options ...IterOption) error {
555+
func (b *metricBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error {
499556
const op = OpIter
500557
b.ops.WithLabelValues(op).Inc()
501558

@@ -508,6 +565,23 @@ func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string,
508565
return err
509566
}
510567

568+
func (b *metricBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error {
569+
const op = OpIter
570+
b.ops.WithLabelValues(op).Inc()
571+
572+
err := b.bkt.IterWithAttributes(ctx, dir, f, options...)
573+
if err != nil {
574+
if !b.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
575+
b.opsFailures.WithLabelValues(op).Inc()
576+
}
577+
}
578+
return err
579+
}
580+
581+
func (b *metricBucket) SupportedIterOptions() []IterOptionType {
582+
return b.SupportedIterOptions()
583+
}
584+
511585
func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
512586
const op = OpAttributes
513587
b.ops.WithLabelValues(op).Inc()

prefixed_bucket.go

+16-3
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,27 @@ func (p *PrefixedBucket) Close() error {
4646
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
4747
// object name including the prefix of the inspected directory.
4848
// Entries are passed to function in sorted order.
49-
func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(name string, attrs ObjectAttributes) error, options ...IterOption) error {
49+
func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error {
5050
pdir := withPrefix(p.prefix, dir)
5151

52-
return p.bkt.Iter(ctx, pdir, func(s string, _ ObjectAttributes) error {
53-
return f(strings.TrimPrefix(s, p.prefix+DirDelim), EmptyObjectAttributes)
52+
return p.bkt.Iter(ctx, pdir, func(s string) error {
53+
return f(strings.TrimPrefix(s, p.prefix+DirDelim))
5454
}, options...)
5555
}
5656

57+
func (p *PrefixedBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error {
58+
pdir := withPrefix(p.prefix, dir)
59+
60+
return p.bkt.IterWithAttributes(ctx, pdir, func(attrs IterObjectAttributes) error {
61+
attrs.Name = strings.TrimPrefix(attrs.Name, p.prefix+DirDelim)
62+
return f(attrs)
63+
}, options...)
64+
}
65+
66+
func (p *PrefixedBucket) SupportedIterOptions() []IterOptionType {
67+
return p.SupportedIterOptions()
68+
}
69+
5770
// Get returns a reader for the given object name.
5871
func (p *PrefixedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
5972
return p.bkt.Get(ctx, conditionalPrefix(p.prefix, name))

prefixed_bucket_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,17 @@ func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) {
7171

7272
testutil.Ok(t, bkt.Upload(context.Background(), strings.Trim(prefix, "/")+"/dir/file1.jpg", strings.NewReader("test-data1")))
7373
seen := []string{}
74-
testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string, _ ObjectAttributes) error {
74+
testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string) error {
7575
seen = append(seen, fn)
7676
return nil
77-
}, WithRecursiveIter))
77+
}, WithRecursiveIter()))
7878
expected := []string{"dir/file1.jpg", "file1.jpg"}
7979
sort.Strings(expected)
8080
sort.Strings(seen)
8181
testutil.Equals(t, expected, seen)
8282

8383
seen = []string{}
84-
testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string, _ ObjectAttributes) error {
84+
testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string) error {
8585
seen = append(seen, fn)
8686
return nil
8787
}))

providers/azure/azure.go

+41-6
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,15 @@ func NewBucketWithConfig(logger log.Logger, conf Config, component string) (*Buc
180180
return bkt, nil
181181
}
182182

183-
// Iter calls f for each entry in the given directory. The argument to f is the full
184-
// object name including the prefix of the inspected directory.
185-
func (b *Bucket) Iter(ctx context.Context, dir string, f func(name string, _ objstore.ObjectAttributes) error, options ...objstore.IterOption) error {
183+
func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType {
184+
return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt}
185+
}
186+
187+
func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
188+
if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil {
189+
return err
190+
}
191+
186192
prefix := dir
187193
if prefix != "" && !strings.HasSuffix(prefix, DirDelim) {
188194
prefix += DirDelim
@@ -198,7 +204,13 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(name string, _ obj
198204
return err
199205
}
200206
for _, blob := range resp.Segment.BlobItems {
201-
if err := f(*blob.Name, objstore.EmptyObjectAttributes); err != nil {
207+
attrs := objstore.IterObjectAttributes{
208+
Name: *blob.Name,
209+
}
210+
if params.LastModified {
211+
attrs.SetLastModified(*blob.Properties.LastModified)
212+
}
213+
if err := f(attrs); err != nil {
202214
return err
203215
}
204216
}
@@ -214,19 +226,42 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(name string, _ obj
214226
return err
215227
}
216228
for _, blobItem := range resp.Segment.BlobItems {
217-
if err := f(*blobItem.Name, objstore.EmptyObjectAttributes); err != nil {
229+
attrs := objstore.IterObjectAttributes{
230+
Name: *blobItem.Name,
231+
}
232+
if params.LastModified {
233+
attrs.SetLastModified(*blobItem.Properties.LastModified)
234+
}
235+
if err := f(attrs); err != nil {
218236
return err
219237
}
220238
}
221239
for _, blobPrefix := range resp.Segment.BlobPrefixes {
222-
if err := f(*blobPrefix.Name, objstore.EmptyObjectAttributes); err != nil {
240+
if err := f(objstore.IterObjectAttributes{Name: *blobPrefix.Name}); err != nil {
223241
return err
224242
}
225243
}
226244
}
227245
return nil
228246
}
229247

248+
// Iter calls f for each entry in the given directory. The argument to f is the full
249+
// object name including the prefix of the inspected directory.
250+
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error {
251+
// Only include recursive option since attributes are not used in this method.
252+
var filteredOpts []objstore.IterOption
253+
for _, opt := range opts {
254+
if opt.Type == objstore.Recursive {
255+
filteredOpts = append(filteredOpts, opt)
256+
break
257+
}
258+
}
259+
260+
return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error {
261+
return f(attrs.Name)
262+
}, filteredOpts...)
263+
}
264+
230265
// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
231266
func (b *Bucket) IsObjNotFoundErr(err error) bool {
232267
if err == nil {

0 commit comments

Comments
 (0)