Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: introduce add, update and delete operations to sync providers #307

Merged
3 changes: 2 additions & 1 deletion pkg/eval/ievaluator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eval

import (
"github.com/open-feature/flagd/pkg/sync"
"google.golang.org/protobuf/types/known/structpb"
)

Expand Down Expand Up @@ -40,7 +41,7 @@ do parsing and validation of the flag state and evaluate flags in response to ha
*/
type IEvaluator interface {
GetState() (string, error)
SetState(source string, state string) (map[string]interface{}, error)
SetState(payload sync.DataSync) (map[string]interface{}, error)

ResolveBooleanValue(
reqID string,
Expand Down
90 changes: 64 additions & 26 deletions pkg/eval/json_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"errors"
"fmt"
"regexp"
"strconv"
"strings"

"github.com/open-feature/flagd/pkg/sync"

"github.com/diegoholiveira/jsonlogic/v3"
"github.com/open-feature/flagd/pkg/logger"
"github.com/open-feature/flagd/pkg/model"
Expand Down Expand Up @@ -42,6 +45,9 @@ func NewJSONEvaluator(logger *logger.Logger) *JSONEvaluator {
zap.String("component", "evaluator"),
zap.String("evaluator", "json"),
),
state: Flags{
Flags: map[string]Flag{},
},
}
jsonlogic.AddOperator("fractionalEvaluation", ev.fractionalEvaluation)
return &ev
Expand All @@ -55,36 +61,25 @@ func (je *JSONEvaluator) GetState() (string, error) {
return string(data), nil
}

func (je *JSONEvaluator) SetState(source string, state string) (map[string]interface{}, error) {
schemaLoader := gojsonschema.NewStringLoader(schema.FlagdDefinitions)
flagStringLoader := gojsonschema.NewStringLoader(state)
result, err := gojsonschema.Validate(schemaLoader, flagStringLoader)

if err != nil {
return nil, err
} else if !result.Valid() {
err := errors.New("invalid JSON file")
return nil, err
}

state, err = je.transposeEvaluators(state)
if err != nil {
return nil, fmt.Errorf("transpose evaluators: %w", err)
}

func (je *JSONEvaluator) SetState(payload sync.DataSync) (map[string]interface{}, error) {
var newFlags Flags
err = json.Unmarshal([]byte(state), &newFlags)
err := je.configToFlags(payload.FlagData, &newFlags)
if err != nil {
return nil, fmt.Errorf("unmarshal new state: %w", err)
}
if err := validateDefaultVariants(newFlags); err != nil {
return nil, err
}

s, notifications := je.state.Merge(je.Logger, source, newFlags)
je.state = s

return notifications, nil
switch payload.Type {
case sync.ALL:
return je.state.Merge(je.Logger, payload.Source, newFlags), nil
case sync.ADD:
return je.state.Add(je.Logger, payload.Source, newFlags), nil
case sync.UPDATE:
return je.state.Update(je.Logger, payload.Source, newFlags), nil
case sync.DELETE:
return je.state.Delete(je.Logger, payload.Source, newFlags), nil
default:
return nil, fmt.Errorf("unsupported sync type: %d", payload.Type)
}
}

func resolve[T constraints](reqID string, key string, context *structpb.Struct,
Expand Down Expand Up @@ -274,8 +269,36 @@ func (je *JSONEvaluator) evaluateVariant(
return je.state.Flags[flagKey].DefaultVariant, reason, nil
}

// configToFlags convert string configurations to flags and store them to pointer newFlags
func (je *JSONEvaluator) configToFlags(config string, newFlags *Flags) error {
schemaLoader := gojsonschema.NewStringLoader(schema.FlagdDefinitions)
flagStringLoader := gojsonschema.NewStringLoader(config)

result, err := gojsonschema.Validate(schemaLoader, flagStringLoader)
if err != nil {
return err
} else if !result.Valid() {
return fmt.Errorf("JSON schema validation failed: %s", buildErrorString(result.Errors()))
}

transposedConfig, err := je.transposeEvaluators(config)
if err != nil {
return fmt.Errorf("transposing evaluators: %w", err)
}

err = json.Unmarshal([]byte(transposedConfig), &newFlags)
if err != nil {
return fmt.Errorf("unmarshalling provided configurations: %w", err)
}
if err := validateDefaultVariants(newFlags); err != nil {
return err
}

return nil
}

// validateDefaultVariants returns an error if any of the default variants aren't valid
func validateDefaultVariants(flags Flags) error {
func validateDefaultVariants(flags *Flags) error {
for name, flag := range flags.Flags {
if _, ok := flag.Variants[flag.DefaultVariant]; !ok {
return fmt.Errorf(
Expand Down Expand Up @@ -315,3 +338,18 @@ func (je *JSONEvaluator) transposeEvaluators(state string) (string, error) {

return state, nil
}

// buildErrorString efficiently converts json schema errors to a formatted string, usable for logging
func buildErrorString(errors []gojsonschema.ResultError) string {
var builder strings.Builder

for i, err := range errors {
builder.WriteByte(' ')
builder.WriteString(strconv.Itoa(i + 1))
builder.WriteByte(':')
builder.WriteString(err.String())
builder.WriteByte(' ')
}

return builder.String()
}
129 changes: 109 additions & 20 deletions pkg/eval/json_evaluator_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,139 @@ import (
"github.com/open-feature/flagd/pkg/logger"
)

type Flags struct {
Flags map[string]Flag `json:"flags"`
type Flag struct {
State string `json:"state"`
DefaultVariant string `json:"defaultVariant"`
Variants map[string]any `json:"variants"`
Targeting json.RawMessage `json:"targeting,omitempty"`
Source string `json:"source"`
}

type Evaluators struct {
Evaluators map[string]json.RawMessage `json:"$evaluators"`
}

func (f Flags) Merge(logger *logger.Logger, source string, ff Flags) (Flags, map[string]interface{}) {
type Flags struct {
Flags map[string]Flag `json:"flags"`
}

// Add new flags from source. The implementation is not thread safe
func (f Flags) Add(logger *logger.Logger, source string, ff Flags) map[string]interface{} {
notifications := map[string]interface{}{}

for k, newFlag := range ff.Flags {
if storedFlag, ok := f.Flags[k]; ok && storedFlag.Source != source {
logger.Warn(fmt.Sprintf(
"flag with key %s from source %s already exist, overriding this with flag from source %s",
k,
storedFlag.Source,
source,
))
}

notifications[k] = map[string]interface{}{
"type": string(NotificationCreate),
"source": source,
}

// Store the new version of the flag
newFlag.Source = source
f.Flags[k] = newFlag
}

return notifications
}

// Update existing flags from source. The implementation is not thread safe
func (f Flags) Update(logger *logger.Logger, source string, ff Flags) map[string]interface{} {
notifications := map[string]interface{}{}
result := Flags{Flags: make(map[string]Flag)}

for k, flag := range ff.Flags {
if storedFlag, ok := f.Flags[k]; !ok {
logger.Warn(
fmt.Sprintf("failed to update the flag, flag with key %s from source %s does not exisit.",
k,
source))

continue
} else if storedFlag.Source != source {
logger.Warn(fmt.Sprintf(
"flag with key %s from source %s already exist, overriding this with flag from source %s",
k,
storedFlag.Source,
source,
))
}

notifications[k] = map[string]interface{}{
"type": string(NotificationUpdate),
"source": source,
}

flag.Source = source
f.Flags[k] = flag
}

return notifications
}

// Delete matching flags from source. The implementation is not thread safe
func (f Flags) Delete(logger *logger.Logger, source string, ff Flags) map[string]interface{} {
notifications := map[string]interface{}{}

for k := range ff.Flags {
if _, ok := f.Flags[k]; ok {
notifications[k] = map[string]interface{}{
"type": string(NotificationDelete),
"source": source,
}

delete(f.Flags, k)
} else {
logger.Warn(
fmt.Sprintf("failed to remove flag, flag with key %s from source %s does not exisit.",
k,
source))
}
}

return notifications
}

// Merge provided flags from source with currently stored flags. The implementation is not thread safe
func (f Flags) Merge(logger *logger.Logger, source string, ff Flags) map[string]interface{} {
notifications := map[string]interface{}{}

for k, v := range f.Flags {
if v.Source == source {
if _, ok := ff.Flags[k]; !ok {
// flag has been deleted
delete(f.Flags, k)
notifications[k] = map[string]interface{}{
"type": string(NotificationDelete),
"source": source,
}
continue
}
}
result.Flags[k] = v
}
for k, v := range ff.Flags {
v.Source = source
val, ok := result.Flags[k]

for k, newFlag := range ff.Flags {
newFlag.Source = source

storedFlag, ok := f.Flags[k]
if !ok {
notifications[k] = map[string]interface{}{
"type": string(NotificationCreate),
"source": source,
}
} else if !reflect.DeepEqual(val, v) {
if val.Source != source {
} else if !reflect.DeepEqual(storedFlag, newFlag) {
if storedFlag.Source != source {
logger.Warn(
fmt.Sprintf(
"key value: %s is duplicated across multiple sources this can lead to unexpected behavior: %s, %s",
k,
val.Source,
storedFlag.Source,
source,
),
)
Expand All @@ -56,15 +150,10 @@ func (f Flags) Merge(logger *logger.Logger, source string, ff Flags) (Flags, map
"source": source,
}
}
result.Flags[k] = v

// Store the new version of the flag
f.Flags[k] = newFlag
}
return result, notifications
}

type Flag struct {
State string `json:"state"`
DefaultVariant string `json:"defaultVariant"`
Variants map[string]any `json:"variants"`
Targeting json.RawMessage `json:"targeting,omitempty"`
Source string `json:"source"`
return notifications
}
Loading