Skip to content

Commit

Permalink
Move service.mapResolver to confmap.Resolver
Browse files Browse the repository at this point in the history
The reason to do this is to allow alternative implementation of the service.ConfigProvider to re-use the resolver logic,
also to create an independent "confmap.Conf" resolver for other services/binaries to use.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed May 31, 2022
1 parent 3356863 commit 144a8f4
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 167 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

### 💡 Enhancements 💡

- Move `service.mapResolver` to `confmap.Resolver` (#5444)

### 🧰 Bug fixes 🧰

## v0.52.0 Beta
Expand Down
26 changes: 13 additions & 13 deletions confmap/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# High Level Design

This document is work in progress, some concepts are not yet available
(e.g. MapResolver is a private concept in the service for the moment).
(e.g. Resolver is a private concept in the service for the moment).

## ConfMap
## Conf

The [ConfMap](confmap.go) represents the raw configuration for a service (e.g. OpenTelemetry Collector).
The [Conf](confmap.go) represents the raw configuration for a service (e.g. OpenTelemetry Collector).

## Provider

Expand All @@ -21,21 +21,21 @@ characters long to avoid conflicting with a driver-letter identifier as specifie
The [Converter](converter.go) allows implementing conversion logic for the provided configuration. One of the most
common use-case is to migrate/transform the configuration after a backwards incompatible change.

## MapResolver
## Resolver

The `MapResolver` handles the use of multiple [Providers](#provider) and [Converters](#converter)
The `Resolver` handles the use of multiple [Providers](#provider) and [Converters](#converter)
simplifying configuration parsing, monitoring for updates, and the overall life-cycle of the used config providers.
The `MapResolver` provides two main functionalities: [Configuration Resolving](#configuration-resolving) and
The `Resolver` provides two main functionalities: [Configuration Resolving](#configuration-resolving) and
[Watching for Updates](#watching-for-updates).

### Configuration Resolving

The `MapResolver` receives as input a set of `Providers`, a list of `Converters`, and a list of configuration identifier
`configURI` that will be used to generate the resulting, or effective, configuration in the form of a `config.Map`,
The `Resolver` receives as input a set of `Providers`, a list of `Converters`, and a list of configuration identifier
`configURI` that will be used to generate the resulting, or effective, configuration in the form of a `Conf`,
that can be used by code that is oblivious to the usage of `Providers` and `Converters`.

```terminal
MapResolver Provider
Resolver Provider
│ │
Resolve │ │
────────────────►│ │
Expand Down Expand Up @@ -63,17 +63,17 @@ that can be used by code that is oblivious to the usage of `Providers` and `Conv

The `Resolve` method proceeds in the following steps:

1. Start with an empty "result" of `config.Map` type.
1. Start with an empty "result" of `Conf` type.
2. For each config URI retrieves individual configurations, and merges it into the "result".
2. For each "Converter", call "Convert" for the "result".
4. Return the "result", aka effective, configuration.

### Watching for Updates
After the configuration was processed, the `MapResolver` can be used as a single point to watch for updates in the
After the configuration was processed, the `Resolver` can be used as a single point to watch for updates in the
configuration retrieved via the `Provider` used to retrieve the “initial” configuration and to generate the “effective” one.

```terminal
MapResolver Provider
Resolver Provider
│ │
Watch │ │
───────────►│ │
Expand All @@ -86,4 +86,4 @@ configuration retrieved via the `Provider` used to retrieve the “initial” co
◄───────────┤ │
```

The `MapResolver` does that by passing an `onChange` func to each `Provider.Retrieve` call and capturing all watch events.
The `Resolver` does that by passing an `onChange` func to each `Provider.Retrieve` call and capturing all watch events.
32 changes: 13 additions & 19 deletions confmap/confmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package confmap

import (
"errors"
"fmt"
"io/ioutil"
"path/filepath"
"strings"
Expand Down Expand Up @@ -104,28 +103,12 @@ func TestToStringMap(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
parser, err := newMapFromFile(test.fileName)
require.NoError(t, err)
assert.Equal(t, test.stringMap, parser.ToStringMap())
conf := newConfFromFile(t, test.fileName)
assert.Equal(t, test.stringMap, conf.ToStringMap())
})
}
}

// newMapFromFile creates a new confmap.Conf by reading the given file.
func newMapFromFile(fileName string) (*Conf, error) {
content, err := ioutil.ReadFile(filepath.Clean(fileName))
if err != nil {
return nil, fmt.Errorf("unable to read the file %v: %w", fileName, err)
}

var data map[string]interface{}
if err = yaml.Unmarshal(content, &data); err != nil {
return nil, fmt.Errorf("unable to parse yaml: %w", err)
}

return NewFromStringMap(data), nil
}

func TestExpandNilStructPointersHookFunc(t *testing.T) {
stringMap := map[string]interface{}{
"boolean": nil,
Expand Down Expand Up @@ -236,3 +219,14 @@ func TestMapKeyStringToMapKeyTextUnmarshalerHookFuncErrorUnmarshal(t *testing.T)
cfg := &TestIDConfig{}
assert.Error(t, conf.UnmarshalExact(cfg))
}

// newConfFromFile creates a new Conf by reading the given file.
func newConfFromFile(t *testing.T, fileName string) *Conf {
content, err := ioutil.ReadFile(filepath.Clean(fileName))
require.NoErrorf(t, err, "unable to read the file %v", fileName)

var data map[string]interface{}
require.NoError(t, yaml.Unmarshal(content, &data), "unable to parse yaml")

return NewFromStringMap(data)
}
86 changes: 39 additions & 47 deletions service/mapresolver.go → confmap/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package service // import "go.opentelemetry.io/collector/service"
package confmap // import "go.opentelemetry.io/collector/service"

import (
"context"
Expand All @@ -23,83 +23,86 @@ import (
"sync"

"go.uber.org/multierr"

"go.opentelemetry.io/collector/config/experimental/configsource"
"go.opentelemetry.io/collector/confmap"
)

// follows drive-letter specification:
// https://tools.ietf.org/id/draft-kerwin-file-scheme-07.html#syntax
var driverLetterRegexp = regexp.MustCompile("^[A-z]:")

// mapResolver resolves a configuration as a confmap.Conf.
type mapResolver struct {
// Resolver resolves a configuration as a Conf.
type Resolver struct {
uris []string
providers map[string]confmap.Provider
converters []confmap.Converter
providers map[string]Provider
converters []Converter

sync.Mutex
closers []confmap.CloseFunc
closers []CloseFunc
watcher chan error
}

// newMapResolver returns a new mapResolver that resolves configuration from multiple URIs.
type ResolverSettings struct {
URIs []string
Providers map[string]Provider
Converters []Converter
}

// NewResolver returns a new Resolver that resolves configuration from multiple URIs.
//
// To resolve a configuration the following steps will happen:
// 1. Retrieves individual configurations from all given "URIs", and merge them in the retrieve order.
// 2. Once the confmap.Conf is merged, apply the converters in the given order.
// 2. Once the Conf is merged, apply the converters in the given order.
//
// After the configuration was resolved the `mapResolver` can be used as a single point to watch for updates in
// After the configuration was resolved the `Resolver` can be used as a single point to watch for updates in
// the configuration data retrieved via the config providers used to process the "initial" configuration and to generate
// the "effective" one. The typical usage is the following:
//
// mapResolver.Resolve(ctx)
// mapResolver.Watch() // wait for an event.
// mapResolver.Resolve(ctx)
// mapResolver.Watch() // wait for an event.
// Resolver.Resolve(ctx)
// Resolver.Watch() // wait for an event.
// Resolver.Resolve(ctx)
// Resolver.Watch() // wait for an event.
// // repeat Resolve/Watch cycle until it is time to shut down the Collector process.
// mapResolver.Shutdown(ctx)
// Resolver.Shutdown(ctx)
//
// `uri` must follow the "<scheme>:<opaque_data>" format. This format is compatible with the URI definition
// (see https://datatracker.ietf.org/doc/html/rfc3986). An empty "<scheme>" defaults to "file" schema.
func newMapResolver(uris []string, providers map[string]confmap.Provider, converters []confmap.Converter) (*mapResolver, error) {
if len(uris) == 0 {
func NewResolver(set ResolverSettings) (*Resolver, error) {
if len(set.URIs) == 0 {
return nil, errors.New("invalid map resolver config: no URIs")
}

if len(providers) == 0 {
return nil, errors.New("invalid map resolver config: no map providers")
if len(set.Providers) == 0 {
return nil, errors.New("invalid map resolver config: no providers")
}

// Safe copy, ensures the slices and maps cannot be changed from the caller.
urisCopy := make([]string, len(uris))
copy(urisCopy, uris)
providersCopy := make(map[string]confmap.Provider, len(providers))
for k, v := range providers {
urisCopy := make([]string, len(set.URIs))
copy(urisCopy, set.URIs)
providersCopy := make(map[string]Provider, len(set.Providers))
for k, v := range set.Providers {
providersCopy[k] = v
}
convertersCopy := make([]confmap.Converter, len(converters))
copy(convertersCopy, converters)
convertersCopy := make([]Converter, len(set.Converters))
copy(convertersCopy, set.Converters)

return &mapResolver{
return &Resolver{
uris: urisCopy,
providers: providersCopy,
converters: convertersCopy,
watcher: make(chan error, 1),
}, nil
}

// Resolve returns the configuration as a confmap.Conf, or error otherwise.
// Resolve returns the configuration as a Conf, or error otherwise.
//
// Should never be called concurrently with itself, Watch or Shutdown.
func (mr *mapResolver) Resolve(ctx context.Context) (*confmap.Conf, error) {
func (mr *Resolver) Resolve(ctx context.Context) (*Conf, error) {
// First check if already an active watching, close that if any.
if err := mr.closeIfNeeded(ctx); err != nil {
return nil, fmt.Errorf("cannot close previous watch: %w", err)
}

// Retrieves individual configurations from all URIs in the given order, and merge them in retMap.
retMap := confmap.New()
retMap := New()
for _, uri := range mr.uris {
// For backwards compatibility:
// - empty url scheme means "file".
Expand Down Expand Up @@ -145,15 +148,15 @@ func (mr *mapResolver) Resolve(ctx context.Context) (*confmap.Conf, error) {
// error indicates that there was a problem with watching the configuration changes.
//
// Should never be called concurrently with itself or Get.
func (mr *mapResolver) Watch() <-chan error {
func (mr *Resolver) Watch() <-chan error {
return mr.watcher
}

// Shutdown signals that the provider is no longer in use and the that should close
// and release any resources that it may have created. It terminates the Watch channel.
//
// Should never be called concurrently with itself or Get.
func (mr *mapResolver) Shutdown(ctx context.Context) error {
func (mr *Resolver) Shutdown(ctx context.Context) error {
close(mr.watcher)

var errs error
Expand All @@ -165,25 +168,14 @@ func (mr *mapResolver) Shutdown(ctx context.Context) error {
return errs
}

func (mr *mapResolver) onChange(event *confmap.ChangeEvent) {
// TODO: Remove check for configsource.ErrSessionClosed when providers updated to not call onChange when closed.
if !errors.Is(event.Error, configsource.ErrSessionClosed) {
mr.watcher <- event.Error
}
func (mr *Resolver) onChange(event *ChangeEvent) {
mr.watcher <- event.Error
}

func (mr *mapResolver) closeIfNeeded(ctx context.Context) error {
func (mr *Resolver) closeIfNeeded(ctx context.Context) error {
var err error
for _, ret := range mr.closers {
err = multierr.Append(err, ret(ctx))
}
return err
}

func makeMapProvidersMap(providers ...confmap.Provider) map[string]confmap.Provider {
ret := make(map[string]confmap.Provider, len(providers))
for _, provider := range providers {
ret[provider.Scheme()] = provider
}
return ret
}
Loading

0 comments on commit 144a8f4

Please sign in to comment.