Skip to content

Commit

Permalink
chore(querier): Refactor the store and querier interface. (#15969)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jan 29, 2025
1 parent 8cf8e5f commit 4df8c60
Show file tree
Hide file tree
Showing 24 changed files with 1,095 additions and 722 deletions.
1 change: 0 additions & 1 deletion pkg/compactor/deletion/delete_requests_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ func (ds *deleteRequestsStore) GetCacheGenerationNumber(ctx context.Context, use
}
return false
})

if err != nil {
return "", err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import (
"github.com/grafana/loki/v3/pkg/querier"
"github.com/grafana/loki/v3/pkg/querier/queryrange"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/v3/pkg/querier/tail"
"github.com/grafana/loki/v3/pkg/ruler"
base_ruler "github.com/grafana/loki/v3/pkg/ruler/base"
"github.com/grafana/loki/v3/pkg/runtime"
Expand Down Expand Up @@ -406,7 +407,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
return nil, err
}

t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, prometheus.DefaultRegisterer, logger)
t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -556,8 +557,9 @@ func (t *Loki) initQuerier() (services.Service, error) {
// is standalone ALL routes are registered externally, and when it's in the same process as a frontend,
// we disable the proxying of the tail routes in initQueryFrontend() and we still want these routes regiestered
// on the external router.
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.TailHandler)))
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.TailHandler)))
tailQuerier := tail.NewQuerier(t.ingesterQuerier, t.Querier, deleteStore, t.Overrides, t.Cfg.Querier.TailMaxDuration, tail.NewMetrics(prometheus.DefaultRegisterer), log.With(util_log.Logger, "component", "tail-querier"))
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(tailQuerier.TailHandler)))
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(tailQuerier.TailHandler)))

internalMiddlewares := []queryrangebase.Middleware{
serverutil.RecoveryMiddleware,
Expand Down Expand Up @@ -1936,7 +1938,7 @@ func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLi
}

func (t *Loki) createRulerQueryEngine(logger log.Logger, deleteStore deletion.DeleteRequestsClient) (eng *logql.Engine, err error) {
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, nil, logger)
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, logger)
if err != nil {
return nil, fmt.Errorf("could not create querier: %w", err)
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/querier/deletion/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package deletion

import (
"context"
"time"

"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/v3/pkg/compactor/deletion"
"github.com/grafana/loki/v3/pkg/logproto"
)

type DeleteGetter interface {
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error)
}

// DeletesForUserQuery returns the deletes for a user (taken from request context) within a given time range.
func DeletesForUserQuery(ctx context.Context, startT, endT time.Time, g DeleteGetter) ([]*logproto.Delete, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

d, err := g.GetAllDeleteRequestsForUser(ctx, userID)
if err != nil {
return nil, err
}

start := startT.UnixNano()
end := endT.UnixNano()

var deletes []*logproto.Delete
for _, del := range d {
if del.StartTime.UnixNano() <= end && del.EndTime.UnixNano() >= start {
deletes = append(deletes, &logproto.Delete{
Selector: del.Query,
Start: del.StartTime.UnixNano(),
End: del.EndTime.UnixNano(),
})
}
}

return deletes, nil
}
139 changes: 4 additions & 135 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/websocket"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/middleware"
"github.com/opentracing/opentracing-go"
Expand All @@ -19,27 +18,21 @@ import (
"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/v3/pkg/loghttp"
loghttp_legacy "github.com/grafana/loki/v3/pkg/loghttp/legacy"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
querier_limits "github.com/grafana/loki/v3/pkg/querier/limits"
"github.com/grafana/loki/v3/pkg/querier/queryrange"
index_stats "github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
"github.com/grafana/loki/v3/pkg/util/httpreq"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/marshal"
marshal_legacy "github.com/grafana/loki/v3/pkg/util/marshal/legacy"
serverutil "github.com/grafana/loki/v3/pkg/util/server"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
util_validation "github.com/grafana/loki/v3/pkg/util/validation"
)

const (
wsPingPeriod = 1 * time.Second
)

type QueryResponse struct {
ResultType parser.ValueType `json:"resultType"`
Result parser.Value `json:"result"`
Expand All @@ -53,12 +46,12 @@ type Engine interface {
type QuerierAPI struct {
querier Querier
cfg Config
limits Limits
limits querier_limits.Limits
engine Engine
}

// NewQuerierAPI returns an instance of the QuerierAPI.
func NewQuerierAPI(cfg Config, querier Querier, limits Limits, logger log.Logger) *QuerierAPI {
func NewQuerierAPI(cfg Config, querier Querier, limits querier_limits.Limits, logger log.Logger) *QuerierAPI {
engine := logql.NewEngine(cfg.Engine, querier, limits, logger)
return &QuerierAPI{
cfg: cfg,
Expand Down Expand Up @@ -128,129 +121,6 @@ func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelReques
return resp, err
}

// TailHandler is a http.HandlerFunc for handling tail queries.
func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
CheckOrigin: func(_ *http.Request) bool { return true },
}
logger := util_log.WithContext(r.Context(), util_log.Logger)

req, err := loghttp.ParseTailQuery(r)
if err != nil {
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), w)
return
}

tenantID, err := tenant.TenantID(r.Context())
if err != nil {
level.Warn(logger).Log("msg", "error getting tenant id", "err", err)
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), w)
return
}

encodingFlags := httpreq.ExtractEncodingFlags(r)
version := loghttp.GetVersion(r.RequestURI)

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
level.Error(logger).Log("msg", "Error in upgrading websocket", "err", err)
return
}

level.Info(logger).Log("msg", "starting to tail logs", "tenant", tenantID, "selectors", req.Query)

defer func() {
level.Info(logger).Log("msg", "ended tailing logs", "tenant", tenantID, "selectors", req.Query)
}()

defer func() {
if err := conn.Close(); err != nil {
level.Error(logger).Log("msg", "Error closing websocket", "err", err)
}
}()

tailer, err := q.querier.Tail(r.Context(), req, encodingFlags.Has(httpreq.FlagCategorizeLabels))
if err != nil {
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(logger).Log("msg", "Error connecting to ingesters for tailing", "err", err)
}
return
}
defer func() {
if err := tailer.close(); err != nil {
level.Error(logger).Log("msg", "Error closing Tailer", "err", err)
}
}()

ticker := time.NewTicker(wsPingPeriod)
defer ticker.Stop()

connWriter := marshal.NewWebsocketJSONWriter(conn)

var response *loghttp_legacy.TailResponse
responseChan := tailer.getResponseChan()
closeErrChan := tailer.getCloseErrorChan()

doneChan := make(chan struct{})
go func() {
for {
_, _, err := conn.ReadMessage()
if err != nil {
if closeErr, ok := err.(*websocket.CloseError); ok {
if closeErr.Code == websocket.CloseNormalClosure {
break
}
level.Error(logger).Log("msg", "Error from client", "err", err)
break
} else if tailer.stopped.Load() {
return
}

level.Error(logger).Log("msg", "Unexpected error from client", "err", err)
break
}
}
doneChan <- struct{}{}
}()

for {
select {
case response = <-responseChan:
var err error
if version == loghttp.VersionV1 {
err = marshal.WriteTailResponseJSON(*response, connWriter, encodingFlags)
} else {
err = marshal_legacy.WriteTailResponseJSON(*response, conn)
}
if err != nil {
level.Error(logger).Log("msg", "Error writing to websocket", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
}

case err := <-closeErrChan:
level.Error(logger).Log("msg", "Error from iterator", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
case <-ticker.C:
// This is to periodically check whether connection is active, useful to clean up dead connections when there are no entries to send
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
level.Error(logger).Log("msg", "Error writing ping message to websocket", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
}
case <-doneChan:
return
}
}
}

// SeriesHandler returns the list of time series that match a certain label set.
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, stats.Result, error) {
Expand Down Expand Up @@ -420,7 +290,6 @@ func (q *QuerierAPI) validateMaxEntriesLimits(ctx context.Context, expr syntax.E
// DetectedLabelsHandler returns a response for detected labels
func (q *QuerierAPI) DetectedLabelsHandler(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) {
resp, err := q.querier.DetectedLabels(ctx, req)

if err != nil {
return nil, err
}
Expand All @@ -430,7 +299,7 @@ func (q *QuerierAPI) DetectedLabelsHandler(ctx context.Context, req *logproto.De
// WrapQuerySpanAndTimeout applies a context deadline and a span logger to a query call.
//
// The timeout is based on the per-tenant query timeout configuration.
func WrapQuerySpanAndTimeout(call string, limits Limits) middleware.Interface {
func WrapQuerySpanAndTimeout(call string, limits querier_limits.Limits) middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
sp, ctx := opentracing.StartSpanFromContext(req.Context(), call)
Expand Down
29 changes: 1 addition & 28 deletions pkg/querier/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,33 +51,6 @@ func TestInstantQueryHandler(t *testing.T) {
})
}

func TestTailHandler(t *testing.T) {
defaultLimits := defaultLimitsTestConfig()
limits, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)

api := NewQuerierAPI(mockQuerierConfig(), nil, limits, log.NewNopLogger())

req, err := http.NewRequest("GET", `/`, nil)
require.NoError(t, err)
q := req.URL.Query()
q.Add("query", `{app="loki"}`)
req.URL.RawQuery = q.Encode()
err = req.ParseForm()
require.NoError(t, err)

ctx := user.InjectOrgID(req.Context(), "1|2")
req = req.WithContext(ctx)
require.NoError(t, err)

rr := httptest.NewRecorder()
handler := http.HandlerFunc(api.TailHandler)

handler.ServeHTTP(rr, req)
require.Equal(t, http.StatusBadRequest, rr.Code)
require.Equal(t, "multiple org IDs present", rr.Body.String())
}

type slowConnectionSimulator struct {
sleepFor time.Duration
deadline time.Duration
Expand All @@ -88,7 +61,6 @@ func (s *slowConnectionSimulator) ServeHTTP(_ http.ResponseWriter, r *http.Reque
ctx := r.Context()
if err := ctx.Err(); err != nil {
panic(fmt.Sprintf("context already errored: %s", err))

}
time.Sleep(s.sleepFor)

Expand Down Expand Up @@ -221,6 +193,7 @@ func TestSeriesHandler(t *testing.T) {
require.JSONEq(t, expected, res.Body.String())
})
}

func TestVolumeHandler(t *testing.T) {
ret := &logproto.VolumeResponse{
Volumes: []logproto.Volume{
Expand Down
Loading

0 comments on commit 4df8c60

Please sign in to comment.