Skip to content

Commit

Permalink
Refactor configmapprovider.Provider and Retrieved interfaces
Browse files Browse the repository at this point in the history
This uses a callback instead of a blocking WaitForUpdate function
to notify about config changes.

This simplifies the usage of the Provider, especially when watching
for changes is required.

A follow up PR will add watching capabilities to existing Provider
implementations.
  • Loading branch information
tigrannajaryan committed Nov 11, 2021
1 parent 93c9cab commit 3d76ad1
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 121 deletions.
32 changes: 19 additions & 13 deletions config/configmapprovider/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func TestDefaultMapProvider(t *testing.T) {
mp := NewDefault("testdata/default-config.yaml", nil)
retr, err := mp.Retrieve(context.Background())
retr, err := mp.Retrieve(context.Background(), nil)
require.NoError(t, err)

expectedMap, err := config.NewMapFromBuffer(strings.NewReader(`
Expand All @@ -37,14 +37,16 @@ exporters:
otlp:
endpoint: "localhost:4317"`))
require.NoError(t, err)
assert.Equal(t, expectedMap, retr.Get())
m, err := retr.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, expectedMap, m)

assert.NoError(t, mp.Close(context.Background()))
assert.NoError(t, mp.Shutdown(context.Background()))
}

func TestDefaultMapProvider_AddNewConfig(t *testing.T) {
mp := NewDefault("testdata/default-config.yaml", []string{"processors.batch.timeout=2s"})
cp, err := mp.Retrieve(context.Background())
cp, err := mp.Retrieve(context.Background(), nil)
require.NoError(t, err)

expectedMap, err := config.NewMapFromBuffer(strings.NewReader(`
Expand All @@ -55,16 +57,18 @@ exporters:
otlp:
endpoint: "localhost:4317"`))
require.NoError(t, err)
assert.Equal(t, expectedMap, cp.Get())
m, err := cp.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, expectedMap, m)

assert.NoError(t, mp.Close(context.Background()))
assert.NoError(t, mp.Shutdown(context.Background()))
}

func TestDefaultMapProvider_OverwriteConfig(t *testing.T) {
mp := NewDefault(
"testdata/default-config.yaml",
[]string{"processors.batch.timeout=2s", "exporters.otlp.endpoint=localhost:1234"})
cp, err := mp.Retrieve(context.Background())
cp, err := mp.Retrieve(context.Background(), nil)
require.NoError(t, err)

expectedMap, err := config.NewMapFromBuffer(strings.NewReader(`
Expand All @@ -75,24 +79,26 @@ exporters:
otlp:
endpoint: "localhost:1234"`))
require.NoError(t, err)
assert.Equal(t, expectedMap, cp.Get())
m, err := cp.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, expectedMap, m)

assert.NoError(t, mp.Close(context.Background()))
assert.NoError(t, mp.Shutdown(context.Background()))
}

func TestDefaultMapProvider_InexistentFile(t *testing.T) {
mp := NewDefault("testdata/otelcol-config.yaml", nil)
require.NotNil(t, mp)
_, err := mp.Retrieve(context.Background())
_, err := mp.Retrieve(context.Background(), nil)
require.Error(t, err)

assert.NoError(t, mp.Close(context.Background()))
assert.NoError(t, mp.Shutdown(context.Background()))
}

func TestDefaultMapProvider_EmptyFileName(t *testing.T) {
mp := NewDefault("", nil)
_, err := mp.Retrieve(context.Background())
_, err := mp.Retrieve(context.Background(), nil)
require.Error(t, err)

assert.NoError(t, mp.Close(context.Background()))
assert.NoError(t, mp.Shutdown(context.Background()))
}
13 changes: 8 additions & 5 deletions config/configmapprovider/expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,23 @@ func NewExpand(base Provider) Provider {
}
}

func (emp *expandMapProvider) Retrieve(ctx context.Context) (Retrieved, error) {
retr, err := emp.base.Retrieve(ctx)
func (emp *expandMapProvider) Retrieve(ctx context.Context, onChange func(*ChangeEvent)) (Retrieved, error) {
retr, err := emp.base.Retrieve(ctx, onChange)
if err != nil {
return nil, err
}
cfgMap, err := retr.Get(ctx)
if err != nil {
return nil, err
}
cfgMap := retr.Get()
for _, k := range cfgMap.AllKeys() {
cfgMap.Set(k, expandStringValues(cfgMap.Get(k)))
}
return &simpleRetrieved{confMap: cfgMap}, nil
}

func (emp *expandMapProvider) Close(ctx context.Context) error {
return emp.base.Close(ctx)
func (emp *expandMapProvider) Shutdown(ctx context.Context) error {
return emp.base.Shutdown(ctx)
}

func expandStringValues(value interface{}) interface{} {
Expand Down
12 changes: 8 additions & 4 deletions config/configmapprovider/expand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ func TestExpand(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
// Retrieve the config
emp := NewExpand(NewFile(path.Join("testdata", test.name)))
cp, err := emp.Retrieve(context.Background())
cp, err := emp.Retrieve(context.Background(), nil)
require.NoError(t, err, "Unable to get config")

// Test that expanded configs are the same with the simple config with no env vars.
assert.Equal(t, expectedCfgMap.ToStringMap(), cp.Get().ToStringMap())
m, err := cp.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, expectedCfgMap.ToStringMap(), m.ToStringMap())
})
}
}
Expand All @@ -75,7 +77,7 @@ func TestExpand_EscapedEnvVars(t *testing.T) {

// Retrieve the config
emp := NewExpand(NewFile(path.Join("testdata", "expand-escaped-env.yaml")))
cp, err := emp.Retrieve(context.Background())
cp, err := emp.Retrieve(context.Background(), nil)
require.NoError(t, err, "Unable to get config")

expectedMap := map[string]interface{}{
Expand All @@ -95,5 +97,7 @@ func TestExpand_EscapedEnvVars(t *testing.T) {
// escaped $ alone
"recv.7": "$",
}}
assert.Equal(t, expectedMap, cp.Get().ToStringMap())
m, err := cp.Get(context.Background())
require.NoError(t, err)
assert.Equal(t, expectedMap, m.ToStringMap())
}
4 changes: 2 additions & 2 deletions config/configmapprovider/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewFile(fileName string) Provider {
}
}

func (fmp *fileMapProvider) Retrieve(context.Context) (Retrieved, error) {
func (fmp *fileMapProvider) Retrieve(_ context.Context, _ func(*ChangeEvent)) (Retrieved, error) {
if fmp.fileName == "" {
return nil, errors.New("config file not specified")
}
Expand All @@ -46,6 +46,6 @@ func (fmp *fileMapProvider) Retrieve(context.Context) (Retrieved, error) {
return &simpleRetrieved{confMap: cp}, nil
}

func (*fileMapProvider) Close(context.Context) error {
func (*fileMapProvider) Shutdown(context.Context) error {
return nil
}
4 changes: 2 additions & 2 deletions config/configmapprovider/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ func NewInMemory(buf io.Reader) Provider {
return &inMemoryMapProvider{buf: buf}
}

func (inp *inMemoryMapProvider) Retrieve(context.Context) (Retrieved, error) {
func (inp *inMemoryMapProvider) Retrieve(_ context.Context, onChange func(*ChangeEvent)) (Retrieved, error) {
cfg, err := config.NewMapFromBuffer(inp.buf)
if err != nil {
return nil, err
}
return &simpleRetrieved{confMap: cfg}, nil
}

func (inp *inMemoryMapProvider) Close(context.Context) error {
func (inp *inMemoryMapProvider) Shutdown(context.Context) error {
return nil
}
14 changes: 9 additions & 5 deletions config/configmapprovider/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,28 @@ func NewMerge(ps ...Provider) Provider {
return &mergeMapProvider{providers: ps}
}

func (mp *mergeMapProvider) Retrieve(ctx context.Context) (Retrieved, error) {
func (mp *mergeMapProvider) Retrieve(ctx context.Context, onChange func(*ChangeEvent)) (Retrieved, error) {
retCfgMap := config.NewMap()
for _, p := range mp.providers {
retr, err := p.Retrieve(ctx)
retr, err := p.Retrieve(ctx, onChange)
if err != nil {
return nil, err
}
if err = retCfgMap.Merge(retr.Get()); err != nil {
cfgMap, err := retr.Get(ctx)
if err != nil {
return nil, err
}
if err = retCfgMap.Merge(cfgMap); err != nil {
return nil, err
}
}
return &simpleRetrieved{confMap: retCfgMap}, nil
}

func (mp *mergeMapProvider) Close(ctx context.Context) error {
func (mp *mergeMapProvider) Shutdown(ctx context.Context) error {
var errs error
for _, p := range mp.providers {
errs = multierr.Append(errs, p.Close(ctx))
errs = multierr.Append(errs, p.Shutdown(ctx))
}

return errs
Expand Down
8 changes: 4 additions & 4 deletions config/configmapprovider/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,28 @@ import (
func TestMerge_GetError(t *testing.T) {
pl := NewMerge(&errProvider{err: nil}, &errProvider{errors.New("my error")})
require.NotNil(t, pl)
cp, err := pl.Retrieve(context.Background())
cp, err := pl.Retrieve(context.Background(), nil)
assert.Error(t, err)
assert.Nil(t, cp)
}

func TestMerge_CloseError(t *testing.T) {
pl := NewMerge(&errProvider{err: nil}, &errProvider{errors.New("my error")})
require.NotNil(t, pl)
assert.Error(t, pl.Close(context.Background()))
assert.Error(t, pl.Shutdown(context.Background()))
}

type errProvider struct {
err error
}

func (epl *errProvider) Retrieve(context.Context) (Retrieved, error) {
func (epl *errProvider) Retrieve(context.Context, func(*ChangeEvent)) (Retrieved, error) {
if epl.err == nil {
return &simpleRetrieved{confMap: config.NewMap()}, nil
}
return nil, epl.err
}

func (epl *errProvider) Close(context.Context) error {
func (epl *errProvider) Shutdown(context.Context) error {
return epl.err
}
4 changes: 2 additions & 2 deletions config/configmapprovider/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewProperties(properties []string) Provider {
}
}

func (pmp *propertiesMapProvider) Retrieve(context.Context) (Retrieved, error) {
func (pmp *propertiesMapProvider) Retrieve(_ context.Context, onChange func(*ChangeEvent)) (Retrieved, error) {
if len(pmp.properties) == 0 {
return &simpleRetrieved{confMap: config.NewMap()}, nil
}
Expand Down Expand Up @@ -70,6 +70,6 @@ func (pmp *propertiesMapProvider) Retrieve(context.Context) (Retrieved, error) {
return &simpleRetrieved{confMap: config.NewMapFromStringMap(prop)}, nil
}

func (*propertiesMapProvider) Close(context.Context) error {
func (*propertiesMapProvider) Shutdown(context.Context) error {
return nil
}
14 changes: 8 additions & 6 deletions config/configmapprovider/properties_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,25 @@ func TestPropertiesProvider(t *testing.T) {
}

pmp := NewProperties(setFlagStr)
retr, err := pmp.Retrieve(context.Background())
retr, err := pmp.Retrieve(context.Background(), nil)
require.NoError(t, err)
cfgMap, err := retr.Get(context.Background())
require.NoError(t, err)
cfgMap := retr.Get()
keys := cfgMap.AllKeys()
assert.Len(t, keys, 4)
assert.Equal(t, "2s", cfgMap.Get("processors::batch::timeout"))
assert.Equal(t, "3s", cfgMap.Get("processors::batch/foo::timeout"))
assert.Equal(t, "foo:9200,foo2:9200", cfgMap.Get("exporters::kafka::brokers"))
assert.Equal(t, "localhost:1818", cfgMap.Get("receivers::otlp::protocols::grpc::endpoint"))
require.NoError(t, pmp.Close(context.Background()))
require.NoError(t, pmp.Shutdown(context.Background()))
}

func TestPropertiesProvider_empty(t *testing.T) {
pmp := NewProperties(nil)
retr, err := pmp.Retrieve(context.Background())
retr, err := pmp.Retrieve(context.Background(), nil)
require.NoError(t, err)
cfgMap, err := retr.Get(context.Background())
require.NoError(t, err)
cfgMap := retr.Get()
assert.Equal(t, 0, len(cfgMap.AllKeys()))
require.NoError(t, pmp.Close(context.Background()))
require.NoError(t, pmp.Shutdown(context.Background()))
}
Loading

0 comments on commit 3d76ad1

Please sign in to comment.