Skip to content

Commit

Permalink
Initial implementation of WAL in queued_retry
Browse files Browse the repository at this point in the history
It is based on Jager's BoundedQueue interface and backed by go-diskqueue for WAL
  • Loading branch information
pmm-sumo committed May 19, 2021
1 parent 437ae6f commit 996a7fe
Show file tree
Hide file tree
Showing 18 changed files with 684 additions and 61 deletions.
39 changes: 38 additions & 1 deletion exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,49 @@ The following configuration options can be modified:
- `sending_queue`
- `enabled` (default = true)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
- `queue_size` (default = 5000): Maximum number of batches kept in memory before data; ignored if `enabled` is `false`;
- `queue_size` (default = 5000): Maximum number of batches kept in memory before data; ignored if `enabled` is `false` or WAL is enabled;
User should calculate this as `num_seconds * requests_per_second` where:
- `num_seconds` is the number of seconds to buffer in case of a backend outage
- `requests_per_second` is the average number of requests per seconds.
- `wal_directory` (default = empty): When set, enables Write-Ahead-Log (WAL) and specifies the directory where the log is stored. It should be unique for each exporter type
- `wal_sync_frequency` (default = 1s): When WAL is enabled, makes fsync with a given frequency. Set to 0 to fsync on each item being produced/consumed.
- `resource_to_telemetry_conversion`
- `enabled` (default = false): If `enabled` is `true`, all the resource attributes will be converted to metric labels by default.
- `timeout` (default = 5s): Time to wait per individual attempt to send data to a backend.

The full list of settings exposed for this helper exporter are documented [here](factory.go).


### WAL

When `wal_directory` is set, the queue is being buffered to a disk. This has some limitations currently,
the items that are currently being handled by a consumer are not backed by the persistent storage, which means
that in case of a sudden shutdown, they might be lost.

```
┌─Consumer #1─┐
Truncation │ ┌───┐ │
┌──on sync──┐ ┌───►│ │ 1 │ ├───► Success
│ │ │ │ │ └───┘ │
│ │ │ │ │ │
│ │ │ │ └─────────────┘
│ │ │ │
┌─────────WAL-backed queue────┴─────┴───┐ │ ┌─Consumer #2─┐
│ │ │ │ ┌───┐ │
│ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ │ │ │ │ 2 │ ├───► Permanent
│ n+1 │ n │ ... │ 4 │ │ 3 │ │ 2 │ │ 1 │ ├────┼───►│ └───┘ │ failure
│ └───┘ └───┘ └───┘ └───┘ └───┘ │ │ │ │
│ │ │ └─────────────┘
└───────────────────────────────────────┘ │
▲ ▲ │ ┌─Consumer #3─┐
│ │ │ │ ┌───┐ │ Temporary
│ │ └───►│ │ 3 │ ├───► failure
write read │ └───┘ │
index index │ │ │
▲ └─────────────┘ │
│ ▲ │
│ └── Retry ───────┤
│ │
│ │
└───────────────────────── Requeuing ◄────────── Retry limit exceeded ─┘
```
26 changes: 21 additions & 5 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package exporterhelper

import (
"context"
"fmt"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -52,8 +53,12 @@ type request interface {
onError(error) request
// Returns the count of spans/metric points or log records.
count() int
marshall() ([]byte, error)
}

// requestUnmarshaler defines a function which can take a byte stream and unmarshal it into a relevant request
type requestUnmarshaler func([]byte) (request, error)

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
send(req request) error
Expand Down Expand Up @@ -164,19 +169,26 @@ func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTel
// baseExporter contains common fields between different exporter types.
type baseExporter struct {
component.Component
sender requestSender
qrSender *queuedRetrySender
sender requestSender
qrSender *queuedRetrySender
signalType string
}

func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings) *baseExporter {
func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings, reqUnnmarshaler requestUnmarshaler) (*baseExporter, error) {
var err error
be := &baseExporter{
Component: componenthelper.New(bs.componentOptions...),
}

be.qrSender = newQueuedRetrySender(cfg.ID().String(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger)
// We need specific ID when WAL is used, so single configuration could be used for several signals without risk of WAL file name collisions
senderID := fmt.Sprintf("%s-%s", cfg.ID().String(), be.signalType)
be.qrSender, err = newQueuedRetrySender(senderID, bs.QueueSettings, bs.RetrySettings, reqUnnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, logger)
if err != nil {
return nil, err
}
be.sender = be.qrSender

return be
return be, nil
}

// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper.
Expand All @@ -185,6 +197,10 @@ func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) reques
be.qrSender.consumerSender = f(be.qrSender.consumerSender)
}

func (be *baseExporter) setSignalType(signalType string) {
be.signalType = signalType
}

// Start all senders and exporter and is invoked during service start.
func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
Expand Down
20 changes: 18 additions & 2 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"errors"
"testing"

"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"

"github.com/stretchr/testify/require"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
Expand All @@ -44,22 +47,25 @@ func TestErrorToStatus(t *testing.T) {
}

func TestBaseExporter(t *testing.T) {
be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions())
be, err := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(), nopRequestUnmarshaler())
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
}

func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be := newBaseExporter(
be, err := newBaseExporter(
&defaultExporterCfg,
zap.NewNop(),
fromOptions(
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithResourceToTelemetryConversion(defaultResourceToTelemetrySettings()),
WithTimeout(DefaultTimeoutSettings())),
nopRequestUnmarshaler(),
)
require.NoError(t, err)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
require.Equal(t, want, be.Shutdown(context.Background()))
}
Expand All @@ -70,3 +76,13 @@ func errToStatus(err error) trace.Status {
}
return okStatus
}

func nopTracePusher() consumerhelper.ConsumeTracesFunc {
return func(ctx context.Context, ld pdata.Traces) error {
return nil
}
}

func nopRequestUnmarshaler() requestUnmarshaler {
return newTraceRequestUnmarshalerFunc(nopTracePusher())
}
23 changes: 23 additions & 0 deletions exporter/exporterhelper/consumers_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exporterhelper

// This is largely based on queue.BoundedQueue and matches the subset used in the collector
type consumersQueue interface {
StartConsumers(num int, callback func(item interface{}))
Produce(item interface{}) bool
Stop()
Size() int
}
23 changes: 22 additions & 1 deletion exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ func newLogsRequest(ctx context.Context, ld pdata.Logs, pusher consumerhelper.Co
}
}

func newLogsRequestUnmarshalerFunc(pusher consumerhelper.ConsumeLogsFunc) requestUnmarshaler {
return func(bytes []byte) (request, error) {
logs, err := pdata.LogsFromOtlpProtoBytes(bytes)
if err != nil {
return nil, err
}

// FIXME unmarshall context

return newLogsRequest(context.Background(), logs, pusher), nil
}
}

func (req *logsRequest) onError(err error) request {
var logError consumererror.Logs
if consumererror.AsLogs(err, &logError) {
Expand All @@ -55,6 +68,10 @@ func (req *logsRequest) export(ctx context.Context) error {
return req.pusher(ctx, req.ld)
}

func (req *logsRequest) marshall() ([]byte, error) {
return req.ld.ToOtlpProtoBytes()
}

func (req *logsRequest) count() int {
return req.ld.LogRecordCount()
}
Expand Down Expand Up @@ -84,7 +101,11 @@ func NewLogsExporter(
}

bs := fromOptions(options...)
be := newBaseExporter(cfg, logger, bs)
be, err := newBaseExporter(cfg, logger, bs, newLogsRequestUnmarshalerFunc(pusher))
if err != nil {
return nil, err
}
be.setSignalType("logs")
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &logsExporterWithObservability{
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
Expand Down
23 changes: 22 additions & 1 deletion exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ func newMetricsRequest(ctx context.Context, md pdata.Metrics, pusher consumerhel
}
}

func newMetricsRequestUnmarshalerFunc(pusher consumerhelper.ConsumeMetricsFunc) requestUnmarshaler {
return func(bytes []byte) (request, error) {
metrics, err := pdata.MetricsFromOtlpProtoBytes(bytes)
if err != nil {
return nil, err
}

// FIXME unmarshall context

return newMetricsRequest(context.Background(), metrics, pusher), nil
}
}

func (req *metricsRequest) onError(err error) request {
var metricsError consumererror.Metrics
if consumererror.AsMetrics(err, &metricsError) {
Expand All @@ -55,6 +68,10 @@ func (req *metricsRequest) export(ctx context.Context) error {
return req.pusher(ctx, req.md)
}

func (req *metricsRequest) marshall() ([]byte, error) {
return req.md.ToOtlpProtoBytes()
}

func (req *metricsRequest) count() int {
_, numPoints := req.md.MetricAndDataPointCount()
return numPoints
Expand Down Expand Up @@ -85,7 +102,11 @@ func NewMetricsExporter(
}

bs := fromOptions(options...)
be := newBaseExporter(cfg, logger, bs)
be, err := newBaseExporter(cfg, logger, bs, newMetricsRequestUnmarshalerFunc(pusher))
if err != nil {
return nil, err
}
be.setSignalType("metrics")
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &metricsSenderWithObservability{
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
Expand Down
Loading

0 comments on commit 996a7fe

Please sign in to comment.