Skip to content

Commit

Permalink
Refactoring : changing routerV2 to dispatcher (#1216)
Browse files Browse the repository at this point in the history
Currently, we have two `router_v2.go` files, which is very confusing:
- one is member of `frontend_connector` package and is responsible for
dispatching
- second is a member of `quesma` package and contains logic for `http`
routing definition

This PR updates the first one.
  • Loading branch information
pdelewski authored Jan 24, 2025
1 parent a048ec7 commit 459ace6
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 48 deletions.
24 changes: 12 additions & 12 deletions quesma/frontend_connectors/basic_http_frontend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type BasicHTTPFrontendConnector struct {
mutex sync.Mutex
responseMutator func(w http.ResponseWriter) http.ResponseWriter
endpoint string
routerInstance *RouterV2
dispatcher *Dispatcher
logManager *clickhouse.LogManager
registry schema.Registry
config *config.QuesmaConfiguration
Expand All @@ -40,8 +40,8 @@ func (h *BasicHTTPFrontendConnector) GetChildComponents() []interface{} {
components = append(components, h.router)
}

if h.routerInstance != nil {
components = append(components, h.routerInstance)
if h.dispatcher != nil {
components = append(components, h.dispatcher)
}
return components
}
Expand All @@ -52,18 +52,18 @@ func (h *BasicHTTPFrontendConnector) SetDependencies(deps quesma_api.Dependencie
h.logger = deps.Logger()

deps.PhoneHomeAgent().FailedRequestsCollector(func() int64 {
return h.routerInstance.FailedRequests.Load()
return h.dispatcher.FailedRequests.Load()
})
}

func NewBasicHTTPFrontendConnector(endpoint string, config *config.QuesmaConfiguration) *BasicHTTPFrontendConnector {

return &BasicHTTPFrontendConnector{
endpoint: endpoint,
config: config,
routerInstance: NewRouterV2(config),
logManager: nil,
registry: nil,
endpoint: endpoint,
config: config,
dispatcher: NewDispatcher(config),
logManager: nil,
registry: nil,
responseMutator: func(w http.ResponseWriter) http.ResponseWriter {
return w
},
Expand Down Expand Up @@ -127,7 +127,7 @@ func (h *BasicHTTPFrontendConnector) finalHandler(w http.ResponseWriter, req *ht
h.phoneHomeClient.UserAgentCounters().Add(ua, 1)
}

h.routerInstance.Reroute(req.Context(), w, req, reqBody, h.router, h.logManager, h.registry)
h.dispatcher.Reroute(req.Context(), w, req, reqBody, h.router, h.logManager, h.registry)
}

func (h *BasicHTTPFrontendConnector) Listen() error {
Expand Down Expand Up @@ -175,8 +175,8 @@ func ReadRequestBody(request *http.Request) ([]byte, error) {
return reqBody, nil
}

func (h *BasicHTTPFrontendConnector) GetRouterInstance() *RouterV2 {
return h.routerInstance
func (h *BasicHTTPFrontendConnector) GetDispatcherInstance() *Dispatcher {
return h.dispatcher
}

func (h *BasicHTTPFrontendConnector) AddMiddleware(middleware http.Handler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import (
"time"
)

func responseFromElasticV2(ctx context.Context, elkResponse *http.Response, w http.ResponseWriter) {
func responseFromElastic(ctx context.Context, elkResponse *http.Response, w http.ResponseWriter) {
if id, ok := ctx.Value(tracing.RequestIdCtxKey).(string); ok {
logger.Debug().Str(logger.RID, id).Msg("responding from Elasticsearch")
}

copyHeadersV2(w, elkResponse)
copyHeaders(w, elkResponse)
w.Header().Set(QuesmaSourceHeader, QuesmaSourceElastic)
// io.Copy calls WriteHeader implicitly
w.WriteHeader(elkResponse.StatusCode)
Expand All @@ -49,7 +49,7 @@ func responseFromElasticV2(ctx context.Context, elkResponse *http.Response, w ht
elkResponse.Body.Close()
}

func responseFromQuesmaV2(ctx context.Context, unzipped []byte, w http.ResponseWriter, quesmaResponse *quesma_api.Result, zip bool) {
func responseFromQuesma(ctx context.Context, unzipped []byte, w http.ResponseWriter, quesmaResponse *quesma_api.Result, zip bool) {
if quesmaResponse == nil {
logger.Error().Msg("responseFromQuesmaV2: quesmaResponse is nil")
return
Expand All @@ -75,7 +75,7 @@ func responseFromQuesmaV2(ctx context.Context, unzipped []byte, w http.ResponseW
}
}

type RouterV2 struct {
type Dispatcher struct {
Config *config.QuesmaConfiguration
RequestPreprocessors quesma_api.ProcessorChain

Expand All @@ -86,11 +86,11 @@ type RouterV2 struct {
phoneHomeAgent diag.PhoneHomeClient
}

func (r *RouterV2) SetDependencies(deps quesma_api.Dependencies) {
func (r *Dispatcher) SetDependencies(deps quesma_api.Dependencies) {
r.debugInfoCollector = deps.DebugInfoCollector()
r.phoneHomeAgent = deps.PhoneHomeAgent()
}
func NewRouterV2(config *config.QuesmaConfiguration) *RouterV2 {
func NewDispatcher(config *config.QuesmaConfiguration) *Dispatcher {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
Expand All @@ -101,18 +101,18 @@ func NewRouterV2(config *config.QuesmaConfiguration) *RouterV2 {
requestProcessors := quesma_api.ProcessorChain{}
requestProcessors = append(requestProcessors, quesma_api.NewTraceIdPreprocessor())

return &RouterV2{
return &Dispatcher{
Config: config,
RequestPreprocessors: requestProcessors,
HttpClient: client,
}
}

func (r *RouterV2) RegisterPreprocessor(preprocessor quesma_api.RequestPreprocessor) {
func (r *Dispatcher) RegisterPreprocessor(preprocessor quesma_api.RequestPreprocessor) {
r.RequestPreprocessors = append(r.RequestPreprocessors, preprocessor)
}

func (r *RouterV2) errorResponseV2(ctx context.Context, err error, w http.ResponseWriter) {
func (r *Dispatcher) errorResponse(ctx context.Context, err error, w http.ResponseWriter) {
r.FailedRequests.Add(1)

msg := "Internal Quesma Error.\nPlease contact support if the problem persists."
Expand Down Expand Up @@ -140,10 +140,10 @@ func (r *RouterV2) errorResponseV2(ctx context.Context, err error, w http.Respon

// We should not send our error message to the client. There can be sensitive information in it.
// We will send ID of failed request instead
responseFromQuesmaV2(ctx, []byte(fmt.Sprintf("%s\nRequest ID: %s\n", msg, requestId)), w, result, false)
responseFromQuesma(ctx, []byte(fmt.Sprintf("%s\nRequest ID: %s\n", msg, requestId)), w, result, false)
}

func (*RouterV2) closedIndexResponse(ctx context.Context, w http.ResponseWriter, pattern string) {
func (*Dispatcher) closedIndexResponse(ctx context.Context, w http.ResponseWriter, pattern string) {
// TODO we should return a proper status code here (400?)
w.WriteHeader(http.StatusOK)

Expand All @@ -170,7 +170,7 @@ func (*RouterV2) closedIndexResponse(ctx context.Context, w http.ResponseWriter,

}

func (r *RouterV2) ElasticFallback(decision *quesma_api.Decision,
func (r *Dispatcher) ElasticFallback(decision *quesma_api.Decision,
ctx context.Context, w http.ResponseWriter,
req *http.Request, reqBody []byte, logManager *clickhouse.LogManager, schemaRegistry schema.Registry) {

Expand All @@ -181,7 +181,7 @@ func (r *RouterV2) ElasticFallback(decision *quesma_api.Decision,
if decision.Err != nil {
w.Header().Set(QuesmaSourceHeader, QuesmaSourceClickhouse)
AddProductAndContentHeaders(req.Header, w.Header())
r.errorResponseV2(ctx, decision.Err, w)
r.errorResponse(ctx, decision.Err, w)
return
}

Expand Down Expand Up @@ -224,7 +224,7 @@ func (r *RouterV2) ElasticFallback(decision *quesma_api.Decision,
rawResponse := <-r.sendHttpRequestToElastic(ctx, req, reqBody, true)
response := rawResponse.response
if response != nil {
responseFromElasticV2(ctx, response, w)
responseFromElastic(ctx, response, w)
} else {
w.Header().Set(QuesmaSourceHeader, QuesmaSourceElastic)
w.WriteHeader(500)
Expand All @@ -233,11 +233,11 @@ func (r *RouterV2) ElasticFallback(decision *quesma_api.Decision,
}
}
} else {
r.errorResponseV2(ctx, end_user_errors.ErrNoConnector.New(fmt.Errorf("no connector found")), w)
r.errorResponse(ctx, end_user_errors.ErrNoConnector.New(fmt.Errorf("no connector found")), w)
}
}

func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, router quesma_api.Router, logManager *clickhouse.LogManager, schemaRegistry schema.Registry) {
func (r *Dispatcher) Reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, router quesma_api.Router, logManager *clickhouse.LogManager, schemaRegistry schema.Registry) {
defer recovery.LogAndHandlePanic(ctx, func(err error) {
w.WriteHeader(500)
w.Write(queryparser.InternalQuesmaError("Unknown Quesma error"))
Expand Down Expand Up @@ -270,7 +270,7 @@ func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http
}
dispatcher := &quesma_api.Dispatcher{}
if handlersPipe != nil {
quesmaResponse, err := recordRequestToClickhouseV2(req.URL.Path, r.debugInfoCollector, func() (*quesma_api.Result, error) {
quesmaResponse, err := recordRequestToClickhouse(req.URL.Path, r.debugInfoCollector, func() (*quesma_api.Result, error) {
var result *quesma_api.Result
result, err = handlersPipe.Handler(ctx, quesmaRequest, w)

Expand Down Expand Up @@ -318,10 +318,10 @@ func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http
}
AddProductAndContentHeaders(req.Header, w.Header())

responseFromQuesmaV2(ctx, unzipped, w, quesmaResponse, zip)
responseFromQuesma(ctx, unzipped, w, quesmaResponse, zip)

} else {
r.errorResponseV2(ctx, err, w)
r.errorResponse(ctx, err, w)
}
} else {
if router.GetFallbackHandler() != nil {
Expand Down Expand Up @@ -350,15 +350,15 @@ func preprocessRequest(ctx context.Context, quesmaRequest *quesma_api.Request, r
return processedRequest, ctx, nil
}

type elasticResultV2 struct {
type elasticResult struct {
response *http.Response
error error
took time.Duration
}

func (r *RouterV2) sendHttpRequestToElastic(ctx context.Context, req *http.Request,
reqBody []byte, isManagement bool) chan elasticResultV2 {
elkResponseChan := make(chan elasticResultV2)
func (r *Dispatcher) sendHttpRequestToElastic(ctx context.Context, req *http.Request,
reqBody []byte, isManagement bool) chan elasticResult {
elkResponseChan := make(chan elasticResult)

// If Quesma is exposing unauthenticated API but underlying Elasticsearch requires authentication, we should add the
if r.Config.DisableAuth && req.Header.Get("Authorization") == "" && r.Config.Elasticsearch.User != "" {
Expand All @@ -377,7 +377,7 @@ func (r *RouterV2) sendHttpRequestToElastic(ctx context.Context, req *http.Reque
}

go func() {
elkResponseChan <- recordRequestToElasticV2(req.URL.Path, r.debugInfoCollector, func() elasticResultV2 {
elkResponseChan <- recordRequestToElastic(req.URL.Path, r.debugInfoCollector, func() elasticResult {

isWrite := elasticsearch.IsWriteRequest(req)

Expand All @@ -400,23 +400,23 @@ func (r *RouterV2) sendHttpRequestToElastic(ctx context.Context, req *http.Reque

resp, err := r.sendHttpRequest(ctx, r.Config.Elasticsearch.Url.String(), req, reqBody)
took := span.End(err)
return elasticResultV2{resp, err, took}
return elasticResult{resp, err, took}
})
}()
return elkResponseChan
}

func isResponseOkV2(resp *http.Response) bool {
func isResponseOk(resp *http.Response) bool {
return resp != nil && resp.StatusCode >= 200 && resp.StatusCode < 500
}

func isIngestV2(path string) bool {
func isIngest(path string) bool {
return strings.HasSuffix(path, routes.BulkPath) // We may add more methods in future such as `_put` or `_create`
}

func recordRequestToClickhouseV2(path string, qmc diag.DebugInfoCollector, requestFunc func() (*quesma_api.Result, error)) (*quesma_api.Result, error) {
func recordRequestToClickhouse(path string, qmc diag.DebugInfoCollector, requestFunc func() (*quesma_api.Result, error)) (*quesma_api.Result, error) {
statName := ui.RequestStatisticKibana2Clickhouse
if isIngestV2(path) {
if isIngest(path) {
statName = ui.RequestStatisticIngest2Clickhouse
}
now := time.Now()
Expand All @@ -427,15 +427,15 @@ func recordRequestToClickhouseV2(path string, qmc diag.DebugInfoCollector, reque
return response, err
}

func recordRequestToElasticV2(path string, qmc diag.DebugInfoCollector, requestFunc func() elasticResultV2) elasticResultV2 {
func recordRequestToElastic(path string, qmc diag.DebugInfoCollector, requestFunc func() elasticResult) elasticResult {
statName := ui.RequestStatisticKibana2Elasticsearch
if isIngestV2(path) {
if isIngest(path) {
statName = ui.RequestStatisticIngest2Elasticsearch
}
now := time.Now()
response := requestFunc()
if qmc != nil {
qmc.RecordRequest(statName, time.Since(now), !isResponseOkV2(response.response))
qmc.RecordRequest(statName, time.Since(now), !isResponseOk(response.response))
}
return response
}
Expand Down Expand Up @@ -470,7 +470,7 @@ func PeekBodyV2(r *http.Request) ([]byte, error) {
return reqBody, nil
}

func copyHeadersV2(w http.ResponseWriter, elkResponse *http.Response) {
func copyHeaders(w http.ResponseWriter, elkResponse *http.Response) {
for key, values := range elkResponse.Header {
for _, value := range values {
if key != HttpHeaderContentLength {
Expand All @@ -482,7 +482,7 @@ func copyHeadersV2(w http.ResponseWriter, elkResponse *http.Response) {
}
}

func (r *RouterV2) sendHttpRequest(ctx context.Context, address string, originalReq *http.Request, originalReqBody []byte) (*http.Response, error) {
func (r *Dispatcher) sendHttpRequest(ctx context.Context, address string, originalReq *http.Request, originalReqBody []byte) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, originalReq.Method, address+originalReq.URL.String(), bytes.NewBuffer(originalReqBody))

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions quesma/frontend_connectors/elastic_http_frontend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewElasticHttpIngestFrontendConnector(endpoint string,
BasicHTTPFrontendConnector: NewBasicHTTPFrontendConnector(endpoint, config),
}
fallback := func(ctx context.Context, req *quesma_api.Request, writer http.ResponseWriter) (*quesma_api.Result, error) {
fc.BasicHTTPFrontendConnector.GetRouterInstance().ElasticFallback(req.Decision, ctx, writer, req.OriginalRequest, []byte(req.Body), logManager, registry)
fc.BasicHTTPFrontendConnector.GetDispatcherInstance().ElasticFallback(req.Decision, ctx, writer, req.OriginalRequest, []byte(req.Body), logManager, registry)
return nil, nil
}

Expand All @@ -48,7 +48,7 @@ func NewElasticHttpQueryFrontendConnector(endpoint string,
BasicHTTPFrontendConnector: NewBasicHTTPFrontendConnector(endpoint, config),
}
fallback := func(ctx context.Context, req *quesma_api.Request, writer http.ResponseWriter) (*quesma_api.Result, error) {
fc.BasicHTTPFrontendConnector.GetRouterInstance().ElasticFallback(req.Decision, ctx, writer, req.OriginalRequest, []byte(req.Body), logManager, registry)
fc.BasicHTTPFrontendConnector.GetDispatcherInstance().ElasticFallback(req.Decision, ctx, writer, req.OriginalRequest, []byte(req.Body), logManager, registry)
return nil, nil
}
router.AddFallbackHandler(fallback)
Expand Down

0 comments on commit 459ace6

Please sign in to comment.