Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(querier): Refactor the store and querier interface. #15969

Merged
merged 5 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/compactor/deletion/delete_requests_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ func (ds *deleteRequestsStore) GetCacheGenerationNumber(ctx context.Context, use
}
return false
})

if err != nil {
return "", err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import (
"github.com/grafana/loki/v3/pkg/querier"
"github.com/grafana/loki/v3/pkg/querier/queryrange"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/v3/pkg/querier/tail"
"github.com/grafana/loki/v3/pkg/ruler"
base_ruler "github.com/grafana/loki/v3/pkg/ruler/base"
"github.com/grafana/loki/v3/pkg/runtime"
Expand Down Expand Up @@ -405,7 +406,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
return nil, err
}

t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, prometheus.DefaultRegisterer, logger)
t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -555,8 +556,9 @@ func (t *Loki) initQuerier() (services.Service, error) {
// is standalone ALL routes are registered externally, and when it's in the same process as a frontend,
// we disable the proxying of the tail routes in initQueryFrontend() and we still want these routes regiestered
// on the external router.
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.TailHandler)))
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.TailHandler)))
tailQuerier := tail.NewQuerier(t.ingesterQuerier, t.Querier, deleteStore, t.Overrides, t.Cfg.Querier.TailMaxDuration, tail.NewTailMetrics(prometheus.DefaultRegisterer), log.With(util_log.Logger, "component", "tail-querier"))
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(tailQuerier.TailHandler)))
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(tailQuerier.TailHandler)))

internalMiddlewares := []queryrangebase.Middleware{
serverutil.RecoveryMiddleware,
Expand Down Expand Up @@ -1936,7 +1938,7 @@ func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLi
}

func (t *Loki) createRulerQueryEngine(logger log.Logger, deleteStore deletion.DeleteRequestsClient) (eng *logql.Engine, err error) {
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, nil, logger)
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, logger)
if err != nil {
return nil, fmt.Errorf("could not create querier: %w", err)
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/querier/deletion/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package deletion

import (
"context"
"time"

"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/v3/pkg/compactor/deletion"
"github.com/grafana/loki/v3/pkg/logproto"
)

type DeleteGetter interface {
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error)
}

// DeletesForUserQuery returns the deletes for a user (taken from request context) within a given time range.
func DeletesForUserQuery(ctx context.Context, startT, endT time.Time, g DeleteGetter) ([]*logproto.Delete, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

d, err := g.GetAllDeleteRequestsForUser(ctx, userID)
if err != nil {
return nil, err
}

start := startT.UnixNano()
end := endT.UnixNano()

var deletes []*logproto.Delete
for _, del := range d {
if del.StartTime.UnixNano() <= end && del.EndTime.UnixNano() >= start {
deletes = append(deletes, &logproto.Delete{
Selector: del.Query,
Start: del.StartTime.UnixNano(),
End: del.EndTime.UnixNano(),
})
}
}

return deletes, nil
}
139 changes: 4 additions & 135 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/websocket"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/middleware"
"github.com/opentracing/opentracing-go"
Expand All @@ -19,27 +18,21 @@ import (
"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/v3/pkg/loghttp"
loghttp_legacy "github.com/grafana/loki/v3/pkg/loghttp/legacy"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
querier_limits "github.com/grafana/loki/v3/pkg/querier/limits"
"github.com/grafana/loki/v3/pkg/querier/queryrange"
index_stats "github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
"github.com/grafana/loki/v3/pkg/util/httpreq"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/marshal"
marshal_legacy "github.com/grafana/loki/v3/pkg/util/marshal/legacy"
serverutil "github.com/grafana/loki/v3/pkg/util/server"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
util_validation "github.com/grafana/loki/v3/pkg/util/validation"
)

const (
wsPingPeriod = 1 * time.Second
)

type QueryResponse struct {
ResultType parser.ValueType `json:"resultType"`
Result parser.Value `json:"result"`
Expand All @@ -53,12 +46,12 @@ type Engine interface {
type QuerierAPI struct {
querier Querier
cfg Config
limits Limits
limits querier_limits.Limits
engine Engine
}

// NewQuerierAPI returns an instance of the QuerierAPI.
func NewQuerierAPI(cfg Config, querier Querier, limits Limits, logger log.Logger) *QuerierAPI {
func NewQuerierAPI(cfg Config, querier Querier, limits querier_limits.Limits, logger log.Logger) *QuerierAPI {
engine := logql.NewEngine(cfg.Engine, querier, limits, logger)
return &QuerierAPI{
cfg: cfg,
Expand Down Expand Up @@ -128,129 +121,6 @@ func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelReques
return resp, err
}

// TailHandler is a http.HandlerFunc for handling tail queries.
func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
CheckOrigin: func(_ *http.Request) bool { return true },
}
logger := util_log.WithContext(r.Context(), util_log.Logger)

req, err := loghttp.ParseTailQuery(r)
if err != nil {
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), w)
return
}

tenantID, err := tenant.TenantID(r.Context())
if err != nil {
level.Warn(logger).Log("msg", "error getting tenant id", "err", err)
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), w)
return
}

encodingFlags := httpreq.ExtractEncodingFlags(r)
version := loghttp.GetVersion(r.RequestURI)

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
level.Error(logger).Log("msg", "Error in upgrading websocket", "err", err)
return
}

level.Info(logger).Log("msg", "starting to tail logs", "tenant", tenantID, "selectors", req.Query)

defer func() {
level.Info(logger).Log("msg", "ended tailing logs", "tenant", tenantID, "selectors", req.Query)
}()

defer func() {
if err := conn.Close(); err != nil {
level.Error(logger).Log("msg", "Error closing websocket", "err", err)
}
}()

tailer, err := q.querier.Tail(r.Context(), req, encodingFlags.Has(httpreq.FlagCategorizeLabels))
if err != nil {
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(logger).Log("msg", "Error connecting to ingesters for tailing", "err", err)
}
return
}
defer func() {
if err := tailer.close(); err != nil {
level.Error(logger).Log("msg", "Error closing Tailer", "err", err)
}
}()

ticker := time.NewTicker(wsPingPeriod)
defer ticker.Stop()

connWriter := marshal.NewWebsocketJSONWriter(conn)

var response *loghttp_legacy.TailResponse
responseChan := tailer.getResponseChan()
closeErrChan := tailer.getCloseErrorChan()

doneChan := make(chan struct{})
go func() {
for {
_, _, err := conn.ReadMessage()
if err != nil {
if closeErr, ok := err.(*websocket.CloseError); ok {
if closeErr.Code == websocket.CloseNormalClosure {
break
}
level.Error(logger).Log("msg", "Error from client", "err", err)
break
} else if tailer.stopped.Load() {
return
}

level.Error(logger).Log("msg", "Unexpected error from client", "err", err)
break
}
}
doneChan <- struct{}{}
}()

for {
select {
case response = <-responseChan:
var err error
if version == loghttp.VersionV1 {
err = marshal.WriteTailResponseJSON(*response, connWriter, encodingFlags)
} else {
err = marshal_legacy.WriteTailResponseJSON(*response, conn)
}
if err != nil {
level.Error(logger).Log("msg", "Error writing to websocket", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
}

case err := <-closeErrChan:
level.Error(logger).Log("msg", "Error from iterator", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
case <-ticker.C:
// This is to periodically check whether connection is active, useful to clean up dead connections when there are no entries to send
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
level.Error(logger).Log("msg", "Error writing ping message to websocket", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
}
case <-doneChan:
return
}
}
}

// SeriesHandler returns the list of time series that match a certain label set.
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, stats.Result, error) {
Expand Down Expand Up @@ -420,7 +290,6 @@ func (q *QuerierAPI) validateMaxEntriesLimits(ctx context.Context, expr syntax.E
// DetectedLabelsHandler returns a response for detected labels
func (q *QuerierAPI) DetectedLabelsHandler(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) {
resp, err := q.querier.DetectedLabels(ctx, req)

if err != nil {
return nil, err
}
Expand All @@ -430,7 +299,7 @@ func (q *QuerierAPI) DetectedLabelsHandler(ctx context.Context, req *logproto.De
// WrapQuerySpanAndTimeout applies a context deadline and a span logger to a query call.
//
// The timeout is based on the per-tenant query timeout configuration.
func WrapQuerySpanAndTimeout(call string, limits Limits) middleware.Interface {
func WrapQuerySpanAndTimeout(call string, limits querier_limits.Limits) middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
sp, ctx := opentracing.StartSpanFromContext(req.Context(), call)
Expand Down
29 changes: 1 addition & 28 deletions pkg/querier/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,33 +51,6 @@ func TestInstantQueryHandler(t *testing.T) {
})
}

func TestTailHandler(t *testing.T) {
defaultLimits := defaultLimitsTestConfig()
limits, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)

api := NewQuerierAPI(mockQuerierConfig(), nil, limits, log.NewNopLogger())

req, err := http.NewRequest("GET", `/`, nil)
require.NoError(t, err)
q := req.URL.Query()
q.Add("query", `{app="loki"}`)
req.URL.RawQuery = q.Encode()
err = req.ParseForm()
require.NoError(t, err)

ctx := user.InjectOrgID(req.Context(), "1|2")
req = req.WithContext(ctx)
require.NoError(t, err)

rr := httptest.NewRecorder()
handler := http.HandlerFunc(api.TailHandler)

handler.ServeHTTP(rr, req)
require.Equal(t, http.StatusBadRequest, rr.Code)
require.Equal(t, "multiple org IDs present", rr.Body.String())
}

type slowConnectionSimulator struct {
sleepFor time.Duration
deadline time.Duration
Expand All @@ -88,7 +61,6 @@ func (s *slowConnectionSimulator) ServeHTTP(_ http.ResponseWriter, r *http.Reque
ctx := r.Context()
if err := ctx.Err(); err != nil {
panic(fmt.Sprintf("context already errored: %s", err))

}
time.Sleep(s.sleepFor)

Expand Down Expand Up @@ -221,6 +193,7 @@ func TestSeriesHandler(t *testing.T) {
require.JSONEq(t, expected, res.Body.String())
})
}

func TestVolumeHandler(t *testing.T) {
ret := &logproto.VolumeResponse{
Volumes: []logproto.Volume{
Expand Down
Loading
Loading