Skip to content

Commit

Permalink
Add logs and metrics to investigate body read issues (#1288)
Browse files Browse the repository at this point in the history
* Add logs and metrics to investigate body read issues
  • Loading branch information
lvrach authored Oct 11, 2021
1 parent 3cc7a71 commit fca90cf
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
29 changes: 22 additions & 7 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ type HandleT struct {
requestSizeStat stats.RudderStats
dbWritesStat stats.RudderStats
dbWorkersBufferFullStat, dbWorkersTimeOutStat stats.RudderStats
bodyReadTimeStat stats.RudderStats
trackSuccessCount int
trackFailureCount int
requestMetricLock sync.RWMutex
Expand Down Expand Up @@ -423,7 +424,7 @@ func (gateway *HandleT) userWebRequestWorkerProcess(userWebRequestWorker *userWe
misc.IncrementMapByKey(sourceFailStats, sourceTag, 1)
continue
}
gateway.requestSizeStat.SendTiming(time.Duration(len(body)))
gateway.requestSizeStat.SendTiming(time.Duration(len(body)) * time.Millisecond)
if req.reqType != "batch" {
body, _ = sjson.SetBytes(body, "type", req.reqType)
body, _ = sjson.SetRawBytes(BatchEvent, "batch.0", body)
Expand Down Expand Up @@ -646,12 +647,24 @@ func (gateway *HandleT) eventSchemaWebHandler(wrappedFunc func(http.ResponseWrit
}

func (gateway *HandleT) getPayloadFromRequest(r *http.Request) ([]byte, error) {
if r.Body != nil {
payload, err := io.ReadAll(r.Body)
r.Body.Close()
return payload, err
if r.Body == nil {
return []byte{}, errors.New(response.RequestBodyNil)
}
return []byte{}, errors.New(response.RequestBodyNil)

start := time.Now()
defer gateway.bodyReadTimeStat.SendTiming(time.Since(start))

payload, err := io.ReadAll(r.Body)
r.Body.Close()
if err != nil {
gateway.logger.Errorf(
"Error reading request body, 'Content-Length': %s, partial payload:\n\t%s\n",
r.Header.Get("Content-Length"),
string(payload),
)
return payload, fmt.Errorf("read all request body: %w", err)
}
return payload, nil
}

func (gateway *HandleT) webImportHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -1027,7 +1040,8 @@ func (gateway *HandleT) getPayloadAndWriteKey(w http.ResponseWriter, r *http.Req
sourceTag := gateway.getSourceTagFromWriteKey(writeKey)
misc.IncrementMapByKey(sourceFailStats, sourceTag, 1)
gateway.updateSourceStats(sourceFailStats, "gateway.write_key_failed_requests", map[string]string{sourceTag: writeKey, "reqType": reqType})
return []byte{}, writeKey, err

return []byte{}, writeKey, fmt.Errorf("read payload from request: %w", err)
}
return payload, writeKey, err
}
Expand Down Expand Up @@ -1506,6 +1520,7 @@ func (gateway *HandleT) Setup(application app.Interface, backendConfig backendco
gateway.dbWritesStat = gateway.stats.NewStat("gateway.db_writes", stats.CountType)
gateway.dbWorkersBufferFullStat = gateway.stats.NewStat("gateway.db_workers_buffer_full", stats.CountType)
gateway.dbWorkersTimeOutStat = gateway.stats.NewStat("gateway.db_workers_time_out", stats.CountType)
gateway.bodyReadTimeStat = gateway.stats.NewStat("gateway.http_body_read_time", stats.TimerType)

gateway.backendConfig = backendConfig
gateway.rateLimiter = rateLimiter
Expand Down
2 changes: 1 addition & 1 deletion gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ var _ = Describe("Gateway", func() {
})

It("should reject requests without request body", func() {
expectHandlerResponse(handler, authorizedRequest(WriteKeyInvalid, nil), 400, response.RequestBodyNil+"\n")
expectHandlerResponse(handler, authorizedRequest(WriteKeyInvalid, nil), 400, fmt.Sprintf("read payload from request: %s\n", response.RequestBodyNil))
})

It("should reject requests without valid json in request body", func() {
Expand Down

0 comments on commit fca90cf

Please sign in to comment.