Skip to content

Commit

Permalink
fix: batch writes on entire PRW request
Browse files Browse the repository at this point in the history
Batch the entire Remote-Write request, rather than per-timeseries. This
allows a significantly higher write throughput.

Also, move the tagsFromMetric call out of the inner loop.
  • Loading branch information
karl-nilsson committed Mar 2, 2024
1 parent 35d47e2 commit e9ae4ce
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions bigquerydb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,20 @@ func (c *BigqueryClient) Write(timeseries []*prompb.TimeSeries) error {
inserter := c.client.Dataset(c.datasetID).Table(c.tableID).Inserter()
inserter.SkipInvalidRows = true
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
batch := make([]*Item, 0, len(timeseries))

for i := range timeseries {
ts := timeseries[i]
samples := ts.Samples
batch := make([]*Item, 0, len(samples))
c.recordsFetched.Add(float64(len(samples)))
metric := make(model.Metric, len(ts.Labels))
for _, l := range ts.Labels {
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}

t := tagsFromMetric(metric)

for _, s := range samples {
v := float64(s.Value)
if math.IsNaN(v) || math.IsInf(v, 0) {
Expand All @@ -185,27 +189,25 @@ func (c *BigqueryClient) Write(timeseries []*prompb.TimeSeries) error {
value: v,
metricname: string(metric[model.MetricNameLabel]),
timestamp: model.Time(s.Timestamp).Unix(),
tags: tagsFromMetric(metric),
tags: t,
})

}
}

begin := time.Now()
if err := inserter.Put(ctx, batch); err != nil {
if multiError, ok := err.(bigquery.PutMultiError); ok {
for _, err1 := range multiError {
for _, err2 := range err1.Errors {
fmt.Println(err2)
}
begin := time.Now()
if err := inserter.Put(ctx, batch); err != nil {
if multiError, ok := err.(bigquery.PutMultiError); ok {
for _, err1 := range multiError {
for _, err2 := range err1.Errors {
fmt.Println(err2)
}
}
defer cancel()
return err
}
duration := time.Since(begin).Seconds()
c.batchWriteDuration.Observe(duration)
return err
}
defer cancel()
duration := time.Since(begin).Seconds()
c.batchWriteDuration.Observe(duration)

return nil
}

Expand Down

0 comments on commit e9ae4ce

Please sign in to comment.