From 270e96d28721b75f9e79fa7dcfd38f85cd62b6ca Mon Sep 17 00:00:00 2001 From: Deepak Date: Thu, 8 Oct 2020 01:05:06 +0530 Subject: [PATCH] Add url option for sampling strategies This change will let user to provide a URL to download sampling strategies. Default strategy is the fallback option if the URL is temporarily unavailable. Signed-off-by: Deepak --- .../sampling/strategystore/static/options.go | 2 +- .../strategystore/static/strategy_store.go | 104 ++++++++++--- .../static/strategy_store_test.go | 138 +++++++++++++++++- 3 files changed, 213 insertions(+), 31 deletions(-) diff --git a/plugin/sampling/strategystore/static/options.go b/plugin/sampling/strategystore/static/options.go index 0ef18fbf2ea..b7222b3a5c2 100644 --- a/plugin/sampling/strategystore/static/options.go +++ b/plugin/sampling/strategystore/static/options.go @@ -22,7 +22,7 @@ import ( ) const ( - // SamplingStrategiesFile contains the name of CLI opions for config file. + // SamplingStrategiesFile contains the name of CLI option for config file. SamplingStrategiesFile = "sampling.strategies-file" samplingStrategiesReloadInterval = "sampling.strategies-reload-interval" ) diff --git a/plugin/sampling/strategystore/static/strategy_store.go b/plugin/sampling/strategystore/static/strategy_store.go index 561e7cb91b4..82bdcf92d4e 100644 --- a/plugin/sampling/strategystore/static/strategy_store.go +++ b/plugin/sampling/strategystore/static/strategy_store.go @@ -21,7 +21,8 @@ import ( "encoding/json" "fmt" "io/ioutil" - "path/filepath" + "net/http" + "net/url" "sync/atomic" "time" @@ -31,6 +32,9 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/sampling" ) +// null represents "null" JSON value. +const null = "null" + type strategyStore struct { logger *zap.Logger @@ -45,6 +49,8 @@ type storedStrategies struct { serviceStrategies map[string]*sampling.SamplingStrategyResponse } +type strategyLoader func() ([]byte, error) + // NewStrategyStore creates a strategy store that holds static sampling strategies. func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) { ctx, cancelFunc := context.WithCancel(context.Background()) @@ -55,14 +61,15 @@ func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, er } h.storedStrategies.Store(defaultStrategies()) - strategies, err := loadStrategies(options.StrategiesFile) + loadFn := samplingStrategyLoader(options.StrategiesFile) + strategies, err := loadStrategies(loadFn) if err != nil { return nil, err } h.parseStrategies(strategies) if options.ReloadInterval > 0 { - go h.autoUpdateStrategies(options.ReloadInterval, options.StrategiesFile) + go h.autoUpdateStrategies(options.ReloadInterval, loadFn) } return h, nil } @@ -83,35 +90,86 @@ func (h *strategyStore) Close() { h.cancelFunc() } -func (h *strategyStore) autoUpdateStrategies(interval time.Duration, filePath string) { - lastValue := "" +func downloadSamplingStrategies(url string) ([]byte, error) { + resp, err := http.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to download sampling strategies: %w", err) + } + + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusServiceUnavailable { + return []byte("null"), nil + } + return nil, fmt.Errorf( + "receiving %s while downloading strategies file", + resp.Status, + ) + } + + buf := new(bytes.Buffer) + if _, err = buf.ReadFrom(resp.Body); err != nil { + return nil, fmt.Errorf("failed to read sampling strategies from downloaded JSON: %w", err) + } + return buf.Bytes(), nil +} + +func isURL(str string) bool { + u, err := url.Parse(str) + return err == nil && u.Scheme != "" && u.Host != "" +} + +func samplingStrategyLoader(strategiesFile string) strategyLoader { + if strategiesFile == "" { + return func() ([]byte, error) { + // Using null so that it un-marshals to nil pointer. + return []byte(null), nil + } + } + + if isURL(strategiesFile) { + return func() ([]byte, error) { + return downloadSamplingStrategies(strategiesFile) + } + } + + return func() ([]byte, error) { + currBytes, err := ioutil.ReadFile(strategiesFile) + if err != nil { + return nil, fmt.Errorf("failed to open strategies file: %w", err) + } + return currBytes, nil + } +} + +func (h *strategyStore) autoUpdateStrategies(interval time.Duration, loader strategyLoader) { + lastValue := null ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: - lastValue = h.reloadSamplingStrategyFile(filePath, lastValue) + lastValue = h.reloadSamplingStrategy(loader, lastValue) case <-h.ctx.Done(): return } } } -func (h *strategyStore) reloadSamplingStrategyFile(filePath string, lastValue string) string { - currBytes, err := ioutil.ReadFile(filepath.Clean(filePath)) +func (h *strategyStore) reloadSamplingStrategy(loadFn strategyLoader, lastValue string) string { + newValue, err := loadFn() if err != nil { - h.logger.Error("failed to load sampling strategies", zap.String("file", filePath), zap.Error(err)) + h.logger.Error("failed to re-load sampling strategies", zap.Error(err)) return lastValue } - newValue := string(currBytes) - if lastValue == newValue { + if lastValue == string(newValue) { return lastValue } - if err = h.updateSamplingStrategy(currBytes); err != nil { - h.logger.Error("failed to update sampling strategies from file", zap.Error(err)) + if err := h.updateSamplingStrategy(newValue); err != nil { + h.logger.Error("failed to update sampling strategies", zap.Error(err)) return lastValue } - return newValue + return string(newValue) } func (h *strategyStore) updateSamplingStrategy(bytes []byte) error { @@ -125,24 +183,22 @@ func (h *strategyStore) updateSamplingStrategy(bytes []byte) error { } // TODO good candidate for a global util function -func loadStrategies(strategiesFile string) (*strategies, error) { - if strategiesFile == "" { - return nil, nil - } - data, err := ioutil.ReadFile(strategiesFile) /* nolint #nosec , this comes from an admin, not user */ +func loadStrategies(loadFn strategyLoader) (*strategies, error) { + strategyBytes, err := loadFn() if err != nil { - return nil, fmt.Errorf("failed to open strategies file: %w", err) + return nil, err } - var strategies strategies - if err := json.Unmarshal(data, &strategies); err != nil { + + var strategies *strategies + if err := json.Unmarshal(strategyBytes, &strategies); err != nil { return nil, fmt.Errorf("failed to unmarshal strategies: %w", err) } - return &strategies, nil + return strategies, nil } func (h *strategyStore) parseStrategies(strategies *strategies) { if strategies == nil { - h.logger.Info("No sampling strategies provided, using defaults") + h.logger.Info("No sampling strategies provided or URL is unavailable, using defaults") return } newStore := defaultStrategies() diff --git a/plugin/sampling/strategystore/static/strategy_store_test.go b/plugin/sampling/strategystore/static/strategy_store_test.go index 29a96c474c2..5b6e79ffd7a 100644 --- a/plugin/sampling/strategystore/static/strategy_store_test.go +++ b/plugin/sampling/strategystore/static/strategy_store_test.go @@ -17,6 +17,8 @@ package static import ( "context" "io/ioutil" + "net/http" + "net/http/httptest" "os" "strings" "testing" @@ -31,6 +33,37 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/sampling" ) +// Returns strategies in JSON format. Used for testing +// URL option for sampling strategies. +func mockStrategyServer() *httptest.Server { + f := func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/bad-content": + w.Write([]byte("bad-content")) + return + + case "/bad-status": + w.WriteHeader(404) + return + + case "/service-unavailable": + w.WriteHeader(503) + return + + default: + data, err := ioutil.ReadFile("fixtures/strategies.json") + if err != nil { + w.WriteHeader(500) + return + } + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + w.Write(data) + } + } + return httptest.NewServer(http.HandlerFunc(f)) +} + func TestStrategyStore(t *testing.T) { _, err := NewStrategyStore(Options{StrategiesFile: "fileNotFound.json"}, zap.NewNop()) assert.EqualError(t, err, "failed to open strategies file: open fileNotFound.json: no such file or directory") @@ -43,7 +76,7 @@ func TestStrategyStore(t *testing.T) { logger, buf := testutils.NewLogger() store, err := NewStrategyStore(Options{}, logger) require.NoError(t, err) - assert.Contains(t, buf.String(), "No sampling strategies provided, using defaults") + assert.Contains(t, buf.String(), "No sampling strategies provided or URL is unavailable, using defaults") s, err := store.GetSamplingStrategy(context.Background(), "foo") require.NoError(t, err) assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.001), *s) @@ -62,6 +95,26 @@ func TestStrategyStore(t *testing.T) { s, err = store.GetSamplingStrategy(context.Background(), "default") require.NoError(t, err) assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.5), *s) + + // Test default strategy when URL is temporarily unavailable. + mockServer := mockStrategyServer() + store, err = NewStrategyStore(Options{StrategiesFile: mockServer.URL+"/service-unavailable"}, logger) + assert.Contains(t, buf.String(), "No sampling strategies provided or URL is unavailable, using defaults") + s, err = store.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.001), *s) + + // Test downloading strategies from a URL. + store, err = NewStrategyStore(Options{StrategiesFile: mockServer.URL}, logger) + require.NoError(t, err) + + s, err = store.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + s, err = store.GetSamplingStrategy(context.Background(), "bar") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_RATE_LIMITING, 5), *s) } func TestPerOperationSamplingStrategies(t *testing.T) { @@ -276,7 +329,7 @@ func TestAutoUpdateStrategy(t *testing.T) { assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s) // verify that reloading in no-op - value := store.reloadSamplingStrategyFile(dstFile, string(srcBytes)) + value := store.reloadSamplingStrategy(samplingStrategyLoader(dstFile), string(srcBytes)) assert.Equal(t, string(srcBytes), value) // update file with new probability of 0.9 @@ -293,6 +346,49 @@ func TestAutoUpdateStrategy(t *testing.T) { time.Sleep(1 * time.Millisecond) } assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s) + + // Test auto update strategy with URL option. + mockServer := mockStrategyServer() + ss, err = NewStrategyStore(Options{ + StrategiesFile: mockServer.URL, + ReloadInterval: 10 * time.Millisecond, + }, zap.NewNop()) + require.NoError(t, err) + store = ss.(*strategyStore) + defer store.Close() + + // copy existing fixture content to restore it later. + srcBytes, err = ioutil.ReadFile(srcFile) + require.NoError(t, err) + originalBytes := srcBytes + + // confirm baseline value + s, err = store.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + // verify that reloading in no-op + value = store.reloadSamplingStrategy(samplingStrategyLoader(mockServer.URL), string(srcBytes)) + assert.Equal(t, string(srcBytes), value) + + // update original strategies file with new probability of 0.9 + newStr = strings.Replace(string(srcBytes), "0.8", "0.9", 1) + require.NoError(t, ioutil.WriteFile(srcFile, []byte(newStr), 0644)) + defer func() { + // replace original strategies file with old content. + require.NoError(t, ioutil.WriteFile(srcFile, originalBytes, 0644), "failed to restore original file content") + }() + + // wait for reload timer + for i := 0; i < 1000; i++ { // wait up to 1sec + s, err = store.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + if s.ProbabilisticSampling != nil && s.ProbabilisticSampling.SamplingRate == 0.9 { + break + } + time.Sleep(1 * time.Millisecond) + } + assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s) } func TestAutoUpdateStrategyErrors(t *testing.T) { @@ -314,13 +410,25 @@ func TestAutoUpdateStrategyErrors(t *testing.T) { defer store.Close() // check invalid file path or read failure - assert.Equal(t, "blah", store.reloadSamplingStrategyFile(tempFile.Name()+"bad-path", "blah")) - assert.Len(t, logs.FilterMessage("failed to load sampling strategies").All(), 1) + assert.Equal(t, "blah", store.reloadSamplingStrategy(samplingStrategyLoader(tempFile.Name()+"bad-path"), "blah")) + assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 1) // check bad file content require.NoError(t, ioutil.WriteFile(tempFile.Name(), []byte("bad value"), 0644)) - assert.Equal(t, "blah", store.reloadSamplingStrategyFile(tempFile.Name(), "blah")) - assert.Len(t, logs.FilterMessage("failed to update sampling strategies from file").All(), 1) + assert.Equal(t, "blah", store.reloadSamplingStrategy(samplingStrategyLoader(tempFile.Name()), "blah")) + assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 1) + + // check invalid url + assert.Equal(t, "duh", store.reloadSamplingStrategy(samplingStrategyLoader("bad-url"), "duh")) + assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 2) + + // check status code other than 200 + assert.Equal(t, "duh", store.reloadSamplingStrategy(samplingStrategyLoader(mockStrategyServer().URL+"/bad-status"), "duh")) + assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 3) + + // check bad content from url + assert.Equal(t, "duh", store.reloadSamplingStrategy(samplingStrategyLoader(mockStrategyServer().URL+"/bad-content"), "duh")) + assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 2) } func TestServiceNoPerOperationStrategies(t *testing.T) { @@ -337,3 +445,21 @@ func TestServiceNoPerOperationStrategies(t *testing.T) { expected := makeResponse(sampling.SamplingStrategyType_RATE_LIMITING, 3) assert.Equal(t, *expected.RateLimitingSampling, *s.RateLimitingSampling) } + +func TestSamplingStrategyLoader(t *testing.T) { + // invalid file path + loader := samplingStrategyLoader("not-exists") + _, err := loader() + assert.Contains(t, err.Error(), "failed to open strategies file") + + // status code other than 200 + mockServer := mockStrategyServer() + loader = samplingStrategyLoader(mockServer.URL + "/bad-status") + _, err = loader() + assert.Contains(t, err.Error(), "receiving 404 Not Found while downloading strategies file") + + // should download content from URL + loader = samplingStrategyLoader(mockServer.URL + "/bad-content") + content, err := loader() + assert.Equal(t, "bad-content", string(content)) +}