From 7915afe7c58375305f222799b0bda1f127a87dc0 Mon Sep 17 00:00:00 2001 From: Thomas Barker Date: Thu, 13 Jan 2022 20:36:56 +0000 Subject: [PATCH] Add batching support to metrics-exporter. --- exporter/collector/metricsexporter.go | 184 +++++++++++++++++++++----- 1 file changed, 149 insertions(+), 35 deletions(-) diff --git a/exporter/collector/metricsexporter.go b/exporter/collector/metricsexporter.go index af86afb1c..7004e6443 100644 --- a/exporter/collector/metricsexporter.go +++ b/exporter/collector/metricsexporter.go @@ -24,7 +24,9 @@ import ( "math" "net/url" "path" + "runtime" "strings" + "sync" "time" "unicode" @@ -39,6 +41,7 @@ import ( metricpb "google.golang.org/genproto/googleapis/api/metric" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -55,10 +58,16 @@ type MetricsExporter struct { client *monitoring.MetricClient obs selfObservability mapper metricMapper + // A channel that receives metric descriptor and sends them to GCM once. - mds chan *metricpb.MetricDescriptor - // Channel that is closed when exportMetricDescriptorRunnner goroutine is finished - mdsDone chan struct{} + metricDescriptorC chan *metricpb.MetricDescriptor + mdCache map[string]*metricpb.MetricDescriptor + + timeSeriesC chan *monitoringpb.TimeSeries + pendingTimeSerieses []*monitoringpb.TimeSeries + goroutines sync.WaitGroup + batchTimeoutTimer *time.Timer + shutdownC chan struct{} } // metricMapper is the part that transforms metrics. Separate from MetricsExporter since it has @@ -74,12 +83,24 @@ const ( SummaryPercentileSuffix = "_summary_percentile" ) +const ( + batchTimeout = 1 * time.Second + sendBatchSize = 200 + sendBatchMaxSize = 200 +) + type labels map[string]string func (me *MetricsExporter) Shutdown(ctx context.Context) error { - close(me.mds) + close(me.shutdownC) + c := make(chan struct{}) + go func() { + // Wait until all goroutines are done. + me.goroutines.Wait() + c <- struct{}{} + }() select { - case <-me.mdsDone: + case <-c: case <-ctx.Done(): me.obs.log.Error("Error waiting for async CreateMetricDescriptor calls to finish.", zap.Error(ctx.Err())) } @@ -126,22 +147,25 @@ func NewGoogleCloudMetricsExporter( obs: selfObservability{log: log}, mapper: metricMapper{cfg}, // We create a buffered channel for metric descriptors. - // MetricDescritpors are asycnhronously sent and optimistic. + // MetricDescritpors are asychronously sent and optimistic. // We only get Unit/Description/Display name from them, so it's ok // to drop / conserve resources for sending timeseries. - mds: make(chan *metricpb.MetricDescriptor, cfg.MetricConfig.CreateMetricDescriptorBufferSize), - mdsDone: make(chan struct{}), + metricDescriptorC: make(chan *metricpb.MetricDescriptor, cfg.MetricConfig.CreateMetricDescriptorBufferSize), + mdCache: make(map[string]*metricpb.MetricDescriptor), + timeSeriesC: make(chan *monitoringpb.TimeSeries, runtime.NumCPU()), + shutdownC: make(chan struct{}, 1), } + mExp.goroutines.Add(2) // Fire up the metric descriptor exporter. go mExp.exportMetricDescriptorRunner() + go mExp.exportTimeSeriesRunner() return mExp, nil } // PushMetrics calls pushes pdata metrics to GCM, creating metric descriptors if necessary func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) error { - timeSeries := make([]*monitoringpb.TimeSeries, 0, m.DataPointCount()) rms := m.ResourceMetrics() for i := 0; i < rms.Len(); i++ { rm := rms.At(i) @@ -156,7 +180,9 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) err mes := ilm.Metrics() for k := 0; k < mes.Len(); k++ { metric := mes.At(k) - timeSeries = append(timeSeries, me.mapper.metricToTimeSeries(monitoredResource, metricLabels, metric)...) + for _, ts := range me.mapper.metricToTimeSeries(monitoredResource, metricLabels, metric) { + me.timeSeriesC <- ts + } // We only send metric descriptors if we're configured *and* we're not sending service timeseries. if me.cfg.MetricConfig.SkipCreateMetricDescriptor || me.cfg.MetricConfig.CreateServiceTimeSeries { @@ -164,49 +190,84 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pdata.Metrics) err } for _, md := range me.mapper.metricDescriptor(metric) { - if md != nil { - select { - case me.mds <- md: - default: - // Ignore drops, we'll catch descriptor next time around. - } + if md == nil { + continue + } + select { + case me.metricDescriptorC <- md: + default: + // Ignore drops, we'll catch descriptor next time around. } } } } } - // TODO: self observability + return nil +} + +func (me *MetricsExporter) exportPendingTimeSerieses(ctx context.Context) { + var sendSize int + if len(me.pendingTimeSerieses) < 200 { + sendSize = len(me.pendingTimeSerieses) + } else { + sendSize = 200 + } + + var ts []*monitoringpb.TimeSeries + ts, me.pendingTimeSerieses = me.pendingTimeSerieses, me.pendingTimeSerieses[sendSize:] + + var err error if me.cfg.MetricConfig.CreateServiceTimeSeries { - err := me.createServiceTimeSeries(ctx, timeSeries) - recordPointCount(ctx, len(timeSeries), m.DataPointCount()-len(timeSeries), err) - return err + err = me.createServiceTimeSeries(ctx, ts) + } else { + err = me.createTimeSeries(ctx, ts) } - err := me.createTimeSeries(ctx, timeSeries) - recordPointCount(ctx, len(timeSeries), m.DataPointCount()-len(timeSeries), err) - return err + + var st string + if err == nil { + st = "OK" + } else if s, ok := status.FromError(err); ok { + st = statusCodeToString(s) + } else { + st = "UNKNOWN" + } + + recordPointCountDataPoint(ctx, len(ts), st) + me.obs.log.Error("could not export time series to GCM", zap.Error(err)) } // Reads metric descriptors from the md channel, and reports them (once) to GCM. func (me *MetricsExporter) exportMetricDescriptorRunner() { - mdCache := make(map[string]*metricpb.MetricDescriptor) + defer me.goroutines.Done() + // We iterate over all metric descritpors until the channel is closed. // Note: if we get terminated, this will still attempt to export all descriptors // prior to shutdown. - for md := range me.mds { - // Not yet sent, now we sent it. - if mdCache[md.Type] == nil { - err := me.exportMetricDescriptor(context.TODO(), md) - // TODO: Log-once on error, per metric descriptor? - if err != nil { + for { + select { + case <-me.shutdownC: + for { + select { + case md := <-me.metricDescriptorC: + if err := me.exportMetricDescriptor(context.TODO(), md); err != nil { + me.obs.log.Error("Unable to send metric descriptor.", zap.Error(err), zap.Any("metric_descriptor", md)) + } + default: + goto DONE + } + } + DONE: + // This is the close of the channel + return + + case md := <-me.metricDescriptorC: + if err := me.exportMetricDescriptor(context.TODO(), md); err != nil { + // TODO: Log-once on error, per metric descriptor? me.obs.log.Error("Unable to send metric descriptor.", zap.Error(err), zap.Any("metric_descriptor", md)) - continue } - mdCache[md.Type] = md } - // TODO: We may want to compare current MD vs. previous and validate no changes. } - close(me.mdsDone) } func (me *MetricsExporter) projectName() string { @@ -216,7 +277,11 @@ func (me *MetricsExporter) projectName() string { // Helper method to send metric descriptors to GCM. func (me *MetricsExporter) exportMetricDescriptor(ctx context.Context, md *metricpb.MetricDescriptor) error { - // export + if _, exists := me.mdCache[md.Type]; exists { + return nil + } + me.mdCache[md.Type] = md + req := &monitoringpb.CreateMetricDescriptorRequest{ Name: me.projectName(), MetricDescriptor: md, @@ -925,3 +990,52 @@ func mapMetricPointKind(m pdata.Metric) (metricpb.MetricDescriptor_MetricKind, m } return kind, typ } + +func (me *MetricsExporter) exportTimeSeriesRunner() { + defer me.goroutines.Done() + me.batchTimeoutTimer = time.NewTimer(batchTimeout) + for { + select { + case <-me.shutdownC: + for { + // This is the close of the channel. Publish all + // the pending items on the channel before we + // stop. + select { + case item := <-me.timeSeriesC: + me.processItem(item) + default: + goto DONE + } + } + DONE: + if len(me.pendingTimeSerieses) > 0 { + me.exportPendingTimeSerieses(context.TODO()) + } + return + case item := <-me.timeSeriesC: + me.processItem(item) + case <-me.batchTimeoutTimer.C: + if len(me.pendingTimeSerieses) > 0 { + me.exportPendingTimeSerieses(context.TODO()) + } + me.batchTimeoutTimer.Reset(batchTimeout) + } + } +} + +func (me *MetricsExporter) processItem(ts *monitoringpb.TimeSeries) { + me.pendingTimeSerieses = append(me.pendingTimeSerieses, ts) + sent := false + for len(me.pendingTimeSerieses) >= sendBatchSize { + sent = true + me.exportPendingTimeSerieses(context.TODO()) + } + + if sent { + if !me.batchTimeoutTimer.Stop() { + <-me.batchTimeoutTimer.C + } + me.batchTimeoutTimer.Reset(batchTimeout) + } +}