diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index a32c7c6c7a..0e726a330d 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -91,6 +91,8 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string enablePartialResponse := cmd.Flag("query.partial-response", "Enable partial response for queries if no partial_response param is specified."). Default("true").Bool() + storeResponseTimeout := modelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms")) + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { peer, err := newPeerFn(logger, reg, true, *httpAdvertiseAddr, true) if err != nil { @@ -139,6 +141,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string *webPrefixHeaderName, *maxConcurrentQueries, time.Duration(*queryTimeout), + time.Duration(*storeResponseTimeout), *replicaLabel, peer, selectorLset, @@ -254,6 +257,7 @@ func runQuery( webPrefixHeaderName string, maxConcurrentQueries int, queryTimeout time.Duration, + storeResponseTimeout time.Duration, replicaLabel string, peer cluster.Peer, selectorLset labels.Labels, @@ -304,7 +308,7 @@ func runQuery( }, dialOpts, ) - proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset) + proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout) queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel) engine = promql.NewEngine( promql.EngineOpts{ diff --git a/docs/components/query.md b/docs/components/query.md index 3a08f70603..972796eeca 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -270,5 +270,10 @@ Flags: if no max_source_resolution param is specified. --query.partial-response Enable partial response for queries if no partial_response param is specified. + --store.response-timeout=0ms + If a Store doesn't send any data in this + specified duration then a Store will be ignored + and partial data will be returned if it's + enabled. 0 disables timeout. ``` diff --git a/pkg/query/storeset.go b/pkg/query/storeset.go index bb9c7698df..a660e6566e 100644 --- a/pkg/query/storeset.go +++ b/pkg/query/storeset.go @@ -196,6 +196,10 @@ func (s *storeRef) String() string { return fmt.Sprintf("Addr: %s Labels: %v Mint: %d Maxt: %d", s.addr, s.Labels(), mint, maxt) } +func (s *storeRef) Addr() string { + return s.addr +} + func (s *storeRef) close() { runutil.CloseWithLogOnErr(s.logger, s.cc, fmt.Sprintf("store %v connection close", s.addr)) } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 556bf77da6..316f803d6d 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -7,6 +7,7 @@ import ( "math" "strings" "sync" + "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -32,6 +33,8 @@ type Client interface { TimeRange() (mint int64, maxt int64) String() string + // Addr returns address of a Client. + Addr() string } // ProxyStore implements the store API that proxies request to all given underlying stores. @@ -40,6 +43,8 @@ type ProxyStore struct { stores func() []Client component component.StoreAPI selectorLabels labels.Labels + + responseTimeout time.Duration } // NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client. @@ -49,15 +54,18 @@ func NewProxyStore( stores func() []Client, component component.StoreAPI, selectorLabels labels.Labels, + responseTimeout time.Duration, ) *ProxyStore { if logger == nil { logger = log.NewNopLogger() } + s := &ProxyStore{ - logger: logger, - stores: stores, - component: component, - selectorLabels: selectorLabels, + logger: logger, + stores: stores, + component: component, + selectorLabels: selectorLabels, + responseTimeout: responseTimeout, } return s } @@ -147,7 +155,11 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe } storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st)) - sc, err := st.Series(gctx, r) + // This is used to cancel this stream when one operations takes too long. + seriesCtx, closeSeries := context.WithCancel(gctx) + defer closeSeries() + + sc, err := st.Series(seriesCtx, r) if err != nil { storeID := fmt.Sprintf("%v", storepb.LabelsToString(st.Labels())) if storeID == "" { @@ -162,12 +174,13 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe continue } - // Schedule streamSeriesSet that translates gRPC streamed response into seriesSet (if series) or respCh if warnings. - seriesSet = append(seriesSet, startStreamSeriesSet(gctx, wg, sc, respSender, st.String(), !r.PartialResponseDisabled)) + // Schedule streamSeriesSet that translates gRPC streamed response + // into seriesSet (if series) or respCh if warnings. + seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries, + wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout)) } level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";")) - if len(seriesSet) == 0 { // This is indicates that configured StoreAPIs are not the ones end user expects err := errors.New("No store matched for this query") @@ -196,7 +209,6 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe return err } return nil - } type warnSender interface { @@ -206,6 +218,9 @@ type warnSender interface { // streamSeriesSet iterates over incoming stream of series. // All errors are sent out of band via warning channel. type streamSeriesSet struct { + ctx context.Context + logger log.Logger + stream storepb.Store_SeriesClient warnCh warnSender @@ -215,30 +230,44 @@ type streamSeriesSet struct { errMtx sync.Mutex err error - name string + name string + partialResponse bool + + responseTimeout time.Duration + closeSeries context.CancelFunc } func startStreamSeriesSet( ctx context.Context, + logger log.Logger, + closeSeries context.CancelFunc, wg *sync.WaitGroup, stream storepb.Store_SeriesClient, warnCh warnSender, name string, partialResponse bool, + responseTimeout time.Duration, ) *streamSeriesSet { s := &streamSeriesSet{ - stream: stream, - warnCh: warnCh, - recvCh: make(chan *storepb.Series, 10), - name: name, + ctx: ctx, + logger: logger, + closeSeries: closeSeries, + stream: stream, + warnCh: warnCh, + recvCh: make(chan *storepb.Series, 10), + name: name, + partialResponse: partialResponse, + responseTimeout: responseTimeout, } wg.Add(1) go func() { defer wg.Done() defer close(s.recvCh) + for { r, err := s.stream.Recv() + if err == io.EOF { return } @@ -248,14 +277,15 @@ func startStreamSeriesSet( } if err != nil { + wrapErr := errors.Wrapf(err, "receive series from %s", s.name) if partialResponse { - s.warnCh.send(storepb.NewWarnSeriesResponse(errors.Wrap(err, "receive series"))) + s.warnCh.send(storepb.NewWarnSeriesResponse(wrapErr)) return } s.errMtx.Lock() - defer s.errMtx.Unlock() - s.err = err + s.err = wrapErr + s.errMtx.Unlock() return } @@ -269,10 +299,39 @@ func startStreamSeriesSet( return s } -// Next blocks until new message is received or stream is closed. +// Next blocks until new message is received or stream is closed or operation is timed out. func (s *streamSeriesSet) Next() (ok bool) { - s.currSeries, ok = <-s.recvCh - return ok + ctx := s.ctx + timeoutMsg := fmt.Sprintf("failed to receive any data from %s", s.name) + + if s.responseTimeout != 0 { + timeoutMsg = fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name) + + timeoutCtx, done := context.WithTimeout(s.ctx, s.responseTimeout) + defer done() + ctx = timeoutCtx + } + + select { + case s.currSeries, ok = <-s.recvCh: + return ok + case <-ctx.Done(): + // closeSeries to shutdown a goroutine in startStreamSeriesSet. + s.closeSeries() + + err := errors.Wrap(ctx.Err(), timeoutMsg) + if s.partialResponse { + level.Warn(s.logger).Log("err", err, "msg", "returning partial response") + s.warnCh.send(storepb.NewWarnSeriesResponse(err)) + return false + } + s.errMtx.Lock() + s.err = err + s.errMtx.Unlock() + + level.Warn(s.logger).Log("err", err, "msg", "partial response disabled; aborting request") + return false + } } func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) { diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index fa3e7e110a..ed1c1d3b0a 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -4,6 +4,7 @@ import ( "context" "io" "math" + "os" "testing" "time" @@ -42,6 +43,9 @@ func (c *testClient) String() string { return "test" } +func (c *testClient) Addr() string { + return "testaddr" +} func TestProxyStore_Info(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() @@ -51,7 +55,7 @@ func TestProxyStore_Info(t *testing.T) { q := NewProxyStore(nil, func() []Client { return nil }, component.Query, - nil, + nil, 0*time.Second, ) resp, err := q.Info(ctx, &storepb.InfoRequest{}) @@ -400,11 +404,135 @@ func TestProxyStore_Series(t *testing.T) { expectedErr: errors.New("fetch series for [name:\"ext\" value:\"1\" ] test: error!"), }, } { + + if ok := t.Run(tc.title, func(t *testing.T) { + q := NewProxyStore(nil, + func() []Client { return tc.storeAPIs }, + component.Query, + tc.selectorLabels, + 0*time.Second, + ) + + s := newStoreSeriesServer(context.Background()) + + err := q.Series(tc.req, s) + if tc.expectedErr != nil { + testutil.NotOk(t, err) + testutil.Equals(t, tc.expectedErr.Error(), err.Error()) + return + } + + testutil.Ok(t, err) + + seriesEqual(t, tc.expectedSeries, s.SeriesSet) + testutil.Equals(t, tc.expectedWarningsLen, len(s.Warnings), "got %v", s.Warnings) + }); !ok { + return + } + } +} + +func TestProxyStore_SeriesSlowStores(t *testing.T) { + enable := os.Getenv("THANOS_ENABLE_STORE_READ_TIMEOUT_TESTS") + if enable == "" { + t.Skip("enable THANOS_ENABLE_STORE_READ_TIMEOUT_TESTS to run store-read-timeout tests") + } + + defer leaktest.CheckTimeout(t, 20*time.Second)() + + for _, tc := range []struct { + title string + storeAPIs []Client + selectorLabels tlabels.Labels + + req *storepb.SeriesRequest + + expectedSeries []rawSeries + expectedErr error + expectedWarningsLen int + }{ + { + title: "partial response disabled one thanos query is slow to respond", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + RespDuration: 10 * time.Second, + }, + labels: []storepb.Label{{Name: "ext", Value: "1"}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + labels: []storepb.Label{{Name: "ext", Value: "1"}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseDisabled: true, + }, + expectedErr: errors.New("test: failed to receive any data in 4s from test: context deadline exceeded"), + }, + { + title: "partial response enabled one thanos query is slow to respond", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + labels: []storepb.Label{{Name: "ext", Value: "1"}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + RespDuration: 10 * time.Second, + }, + labels: []storepb.Label{{Name: "ext", Value: "1"}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "a", Value: "b"}}, + samples: []sample{{1, 1}, {2, 2}, {3, 3}}, + }, + }, + expectedWarningsLen: 2, + }, + } { if ok := t.Run(tc.title, func(t *testing.T) { q := NewProxyStore(nil, func() []Client { return tc.storeAPIs }, component.Query, tc.selectorLabels, + 4*time.Second, ) s := newStoreSeriesServer(context.Background()) @@ -446,6 +574,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) { func() []Client { return cls }, component.Query, nil, + 0*time.Second, ) ctx := context.Background() @@ -504,6 +633,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) { func() []Client { return cls }, component.Query, tlabels.FromStrings("fed", "a"), + 0*time.Second, ) ctx := context.Background() @@ -541,6 +671,7 @@ func TestProxyStore_LabelValues(t *testing.T) { func() []Client { return cls }, component.Query, nil, + 0*time.Second, ) ctx := context.Background() @@ -700,6 +831,7 @@ type mockedStoreAPI struct { RespSeries []*storepb.SeriesResponse RespLabelValues *storepb.LabelValuesResponse RespError error + RespDuration time.Duration LastSeriesReq *storepb.SeriesRequest LastLabelValuesReq *storepb.LabelValuesRequest @@ -712,7 +844,7 @@ func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ . func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest, _ ...grpc.CallOption) (storepb.Store_SeriesClient, error) { s.LastSeriesReq = req - return &StoreSeriesClient{ctx: ctx, respSet: s.RespSeries}, s.RespError + return &StoreSeriesClient{ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration}, s.RespError } func (s *mockedStoreAPI) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { @@ -732,9 +864,12 @@ type StoreSeriesClient struct { ctx context.Context i int respSet []*storepb.SeriesResponse + respDur time.Duration } func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) { + time.Sleep(c.respDur) + if c.i >= len(c.respSet) { return nil, io.EOF }