From 996a7fef98eff82abaf93784ff30c01a7f522618 Mon Sep 17 00:00:00 2001 From: Przemek Maciolek Date: Wed, 19 May 2021 18:18:47 +0200 Subject: [PATCH] Initial implementation of WAL in queued_retry It is based on Jager's BoundedQueue interface and backed by go-diskqueue for WAL --- exporter/exporterhelper/README.md | 39 +++- exporter/exporterhelper/common.go | 26 ++- exporter/exporterhelper/common_test.go | 20 +- exporter/exporterhelper/consumers_queue.go | 23 +++ exporter/exporterhelper/logs.go | 23 ++- exporter/exporterhelper/metrics.go | 23 ++- exporter/exporterhelper/queued_retry.go | 87 ++++++-- exporter/exporterhelper/queued_retry_test.go | 53 +++-- exporter/exporterhelper/traces.go | 29 ++- exporter/exporterhelper/wal.go | 200 +++++++++++++++++++ exporter/exporterhelper/wal_test.go | 184 +++++++++++++++++ exporter/jaegerexporter/config_test.go | 7 +- exporter/kafkaexporter/config_test.go | 7 +- exporter/otlpexporter/config_test.go | 7 +- exporter/otlphttpexporter/config_test.go | 7 +- exporter/zipkinexporter/config_test.go | 7 +- go.mod | 1 + go.sum | 2 + 18 files changed, 684 insertions(+), 61 deletions(-) create mode 100644 exporter/exporterhelper/consumers_queue.go create mode 100644 exporter/exporterhelper/wal.go create mode 100644 exporter/exporterhelper/wal_test.go diff --git a/exporter/exporterhelper/README.md b/exporter/exporterhelper/README.md index 1b1bdc42639..798c9bff8b6 100644 --- a/exporter/exporterhelper/README.md +++ b/exporter/exporterhelper/README.md @@ -17,12 +17,49 @@ The following configuration options can be modified: - `sending_queue` - `enabled` (default = true) - `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false` - - `queue_size` (default = 5000): Maximum number of batches kept in memory before data; ignored if `enabled` is `false`; + - `queue_size` (default = 5000): Maximum number of batches kept in memory before data; ignored if `enabled` is `false` or WAL is enabled; User should calculate this as `num_seconds * requests_per_second` where: - `num_seconds` is the number of seconds to buffer in case of a backend outage - `requests_per_second` is the average number of requests per seconds. + - `wal_directory` (default = empty): When set, enables Write-Ahead-Log (WAL) and specifies the directory where the log is stored. It should be unique for each exporter type + - `wal_sync_frequency` (default = 1s): When WAL is enabled, makes fsync with a given frequency. Set to 0 to fsync on each item being produced/consumed. - `resource_to_telemetry_conversion` - `enabled` (default = false): If `enabled` is `true`, all the resource attributes will be converted to metric labels by default. - `timeout` (default = 5s): Time to wait per individual attempt to send data to a backend. The full list of settings exposed for this helper exporter are documented [here](factory.go). + + +### WAL + +When `wal_directory` is set, the queue is being buffered to a disk. This has some limitations currently, +the items that are currently being handled by a consumer are not backed by the persistent storage, which means +that in case of a sudden shutdown, they might be lost. + +``` + ┌─Consumer #1─┐ + Truncation │ ┌───┐ │ + ┌──on sync──┐ ┌───►│ │ 1 │ ├───► Success + │ │ │ │ │ └───┘ │ + │ │ │ │ │ │ + │ │ │ │ └─────────────┘ + │ │ │ │ + ┌─────────WAL-backed queue────┴─────┴───┐ │ ┌─Consumer #2─┐ + │ │ │ │ ┌───┐ │ + │ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ │ │ │ │ 2 │ ├───► Permanent + │ n+1 │ n │ ... │ 4 │ │ 3 │ │ 2 │ │ 1 │ ├────┼───►│ └───┘ │ failure + │ └───┘ └───┘ └───┘ └───┘ └───┘ │ │ │ │ + │ │ │ └─────────────┘ + └───────────────────────────────────────┘ │ + ▲ ▲ │ ┌─Consumer #3─┐ + │ │ │ │ ┌───┐ │ Temporary + │ │ └───►│ │ 3 │ ├───► failure + write read │ └───┘ │ + index index │ │ │ + ▲ └─────────────┘ │ + │ ▲ │ + │ └── Retry ───────┤ + │ │ + │ │ + └───────────────────────── Requeuing ◄────────── Retry limit exceeded ─┘ +``` \ No newline at end of file diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 27d533f5fd8..d670461b594 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -16,6 +16,7 @@ package exporterhelper import ( "context" + "fmt" "time" "go.uber.org/zap" @@ -52,8 +53,12 @@ type request interface { onError(error) request // Returns the count of spans/metric points or log records. count() int + marshall() ([]byte, error) } +// requestUnmarshaler defines a function which can take a byte stream and unmarshal it into a relevant request +type requestUnmarshaler func([]byte) (request, error) + // requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). type requestSender interface { send(req request) error @@ -164,19 +169,26 @@ func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTel // baseExporter contains common fields between different exporter types. type baseExporter struct { component.Component - sender requestSender - qrSender *queuedRetrySender + sender requestSender + qrSender *queuedRetrySender + signalType string } -func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings) *baseExporter { +func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings, reqUnnmarshaler requestUnmarshaler) (*baseExporter, error) { + var err error be := &baseExporter{ Component: componenthelper.New(bs.componentOptions...), } - be.qrSender = newQueuedRetrySender(cfg.ID().String(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger) + // We need specific ID when WAL is used, so single configuration could be used for several signals without risk of WAL file name collisions + senderID := fmt.Sprintf("%s-%s", cfg.ID().String(), be.signalType) + be.qrSender, err = newQueuedRetrySender(senderID, bs.QueueSettings, bs.RetrySettings, reqUnnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, logger) + if err != nil { + return nil, err + } be.sender = be.qrSender - return be + return be, nil } // wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper. @@ -185,6 +197,10 @@ func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) reques be.qrSender.consumerSender = f(be.qrSender.consumerSender) } +func (be *baseExporter) setSignalType(signalType string) { + be.signalType = signalType +} + // Start all senders and exporter and is invoked during service start. func (be *baseExporter) Start(ctx context.Context, host component.Host) error { // First start the wrapped exporter. diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index c200bca7049..a63bfe953dc 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -18,6 +18,9 @@ import ( "errors" "testing" + "go.opentelemetry.io/collector/consumer/consumerhelper" + "go.opentelemetry.io/collector/consumer/pdata" + "github.com/stretchr/testify/require" "go.opencensus.io/tag" "go.opencensus.io/trace" @@ -44,14 +47,15 @@ func TestErrorToStatus(t *testing.T) { } func TestBaseExporter(t *testing.T) { - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions()) + be, err := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(), nopRequestUnmarshaler()) + require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) } func TestBaseExporterWithOptions(t *testing.T) { want := errors.New("my error") - be := newBaseExporter( + be, err := newBaseExporter( &defaultExporterCfg, zap.NewNop(), fromOptions( @@ -59,7 +63,9 @@ func TestBaseExporterWithOptions(t *testing.T) { WithShutdown(func(ctx context.Context) error { return want }), WithResourceToTelemetryConversion(defaultResourceToTelemetrySettings()), WithTimeout(DefaultTimeoutSettings())), + nopRequestUnmarshaler(), ) + require.NoError(t, err) require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) require.Equal(t, want, be.Shutdown(context.Background())) } @@ -70,3 +76,13 @@ func errToStatus(err error) trace.Status { } return okStatus } + +func nopTracePusher() consumerhelper.ConsumeTracesFunc { + return func(ctx context.Context, ld pdata.Traces) error { + return nil + } +} + +func nopRequestUnmarshaler() requestUnmarshaler { + return newTraceRequestUnmarshalerFunc(nopTracePusher()) +} diff --git a/exporter/exporterhelper/consumers_queue.go b/exporter/exporterhelper/consumers_queue.go new file mode 100644 index 00000000000..4a873b2d9e7 --- /dev/null +++ b/exporter/exporterhelper/consumers_queue.go @@ -0,0 +1,23 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporterhelper + +// This is largely based on queue.BoundedQueue and matches the subset used in the collector +type consumersQueue interface { + StartConsumers(num int, callback func(item interface{})) + Produce(item interface{}) bool + Stop() + Size() int +} diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index c9512108252..2eb421bb720 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -43,6 +43,19 @@ func newLogsRequest(ctx context.Context, ld pdata.Logs, pusher consumerhelper.Co } } +func newLogsRequestUnmarshalerFunc(pusher consumerhelper.ConsumeLogsFunc) requestUnmarshaler { + return func(bytes []byte) (request, error) { + logs, err := pdata.LogsFromOtlpProtoBytes(bytes) + if err != nil { + return nil, err + } + + // FIXME unmarshall context + + return newLogsRequest(context.Background(), logs, pusher), nil + } +} + func (req *logsRequest) onError(err error) request { var logError consumererror.Logs if consumererror.AsLogs(err, &logError) { @@ -55,6 +68,10 @@ func (req *logsRequest) export(ctx context.Context) error { return req.pusher(ctx, req.ld) } +func (req *logsRequest) marshall() ([]byte, error) { + return req.ld.ToOtlpProtoBytes() +} + func (req *logsRequest) count() int { return req.ld.LogRecordCount() } @@ -84,7 +101,11 @@ func NewLogsExporter( } bs := fromOptions(options...) - be := newBaseExporter(cfg, logger, bs) + be, err := newBaseExporter(cfg, logger, bs, newLogsRequestUnmarshalerFunc(pusher)) + if err != nil { + return nil, err + } + be.setSignalType("logs") be.wrapConsumerSender(func(nextSender requestSender) requestSender { return &logsExporterWithObservability{ obsrep: obsreport.NewExporter(obsreport.ExporterSettings{ diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 8ad834b0731..4f7f99b7ca2 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -43,6 +43,19 @@ func newMetricsRequest(ctx context.Context, md pdata.Metrics, pusher consumerhel } } +func newMetricsRequestUnmarshalerFunc(pusher consumerhelper.ConsumeMetricsFunc) requestUnmarshaler { + return func(bytes []byte) (request, error) { + metrics, err := pdata.MetricsFromOtlpProtoBytes(bytes) + if err != nil { + return nil, err + } + + // FIXME unmarshall context + + return newMetricsRequest(context.Background(), metrics, pusher), nil + } +} + func (req *metricsRequest) onError(err error) request { var metricsError consumererror.Metrics if consumererror.AsMetrics(err, &metricsError) { @@ -55,6 +68,10 @@ func (req *metricsRequest) export(ctx context.Context) error { return req.pusher(ctx, req.md) } +func (req *metricsRequest) marshall() ([]byte, error) { + return req.md.ToOtlpProtoBytes() +} + func (req *metricsRequest) count() int { _, numPoints := req.md.MetricAndDataPointCount() return numPoints @@ -85,7 +102,11 @@ func NewMetricsExporter( } bs := fromOptions(options...) - be := newBaseExporter(cfg, logger, bs) + be, err := newBaseExporter(cfg, logger, bs, newMetricsRequestUnmarshalerFunc(pusher)) + if err != nil { + return nil, err + } + be.setSignalType("metrics") be.wrapConsumerSender(func(nextSender requestSender) requestSender { return &metricsSenderWithObservability{ obsrep: obsreport.NewExporter(obsreport.ExporterSettings{ diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index ae68dafc467..f99cd3c1028 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -55,6 +55,10 @@ type QueueSettings struct { NumConsumers int `mapstructure:"num_consumers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"` + // WalDirectory describes where Write-Ahead-Log should be stored. If empty (default) no WAL used + WalDirectory string `mapstructure:"wal_directory"` + // WalSyncFrequency describes how frequently the WAL should be synced. If set to zero, it's synced after each operation + WalSyncFrequency time.Duration `mapstructure:"wal_sync_frequency"` } // DefaultQueueSettings returns the default settings for QueueSettings. @@ -66,7 +70,9 @@ func DefaultQueueSettings() QueueSettings { // This is a pretty decent value for production. // User should calculate this from the perspective of how many seconds to buffer in case of a backend outage, // multiply that by the number of requests per seconds. - QueueSize: 5000, + QueueSize: 5000, + WalDirectory: "", + WalSyncFrequency: 1 * time.Second, } } @@ -99,7 +105,7 @@ type queuedRetrySender struct { fullName string cfg QueueSettings consumerSender requestSender - queue *queue.BoundedQueue + queue consumersQueue retryStopCh chan struct{} traceAttributes []trace.Attribute logger *zap.Logger @@ -124,25 +130,64 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger { return logger.WithOptions(opts) } -func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySettings, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { +func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySettings, reqUnmarshaler requestUnmarshaler, nextSender requestSender, logger *zap.Logger) (*queuedRetrySender, error) { retryStopCh := make(chan struct{}) sampledLogger := createSampledLogger(logger) traceAttr := trace.StringAttribute(obsreport.ExporterKey, fullName) + walEnabled := qCfg.WalDirectory != "" + + var err error + var _queue consumersQueue + var onTemporaryFailure onRequestTemporaryFailureFunc + + if walEnabled { + _queue, err = newWALQueue(logger, qCfg.WalDirectory, fullName, qCfg.WalSyncFrequency, reqUnmarshaler) + onTemporaryFailure = func(req request, err error) error { + if _queue.Produce(req) { + sampledLogger.Error( + "Exporting failed. Putting back to the end of the queue.", + zap.Error(err), + ) + } else { + sampledLogger.Error( + "Exporting failed. Queue did not accept requeuing request. Dropping data.", + zap.Error(err), + zap.Int("dropped_items", req.count()), + ) + } + return err + } + if err != nil { + return nil, err + } + } else { + _queue = queue.NewBoundedQueue(qCfg.QueueSize, func(item interface{}) {}) + onTemporaryFailure = func(req request, err error) error { + sampledLogger.Error( + "Exporting failed. No more retries left. Dropping data.", + zap.Error(err), + zap.Int("dropped_items", req.count()), + ) + return err + } + } + return &queuedRetrySender{ fullName: fullName, cfg: qCfg, consumerSender: &retrySender{ - traceAttribute: traceAttr, - cfg: rCfg, - nextSender: nextSender, - stopCh: retryStopCh, - logger: sampledLogger, + traceAttribute: traceAttr, + cfg: rCfg, + nextSender: nextSender, + stopCh: retryStopCh, + logger: sampledLogger, + onTemporaryFailure: onTemporaryFailure, }, - queue: queue.NewBoundedQueue(qCfg.QueueSize, func(item interface{}) {}), + queue: _queue, retryStopCh: retryStopCh, traceAttributes: []trace.Attribute{traceAttr}, logger: sampledLogger, - } + }, nil } // start is invoked during service startup. @@ -205,7 +250,7 @@ func (qrs *queuedRetrySender) shutdown() { }, metricdata.NewLabelValue(qrs.fullName)) } - // First stop the retry goroutines, so that unblocks the queue workers. + // First stop the retry goroutines, so that unblocks the queue numWorkers. close(qrs.retryStopCh) // Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only @@ -227,12 +272,15 @@ func NewThrottleRetry(err error, delay time.Duration) error { } } +type onRequestTemporaryFailureFunc func(request, error) error + type retrySender struct { - traceAttribute trace.Attribute - cfg RetrySettings - nextSender requestSender - stopCh chan struct{} - logger *zap.Logger + traceAttribute trace.Attribute + cfg RetrySettings + nextSender requestSender + stopCh chan struct{} + logger *zap.Logger + onTemporaryFailure onRequestTemporaryFailureFunc } // send implements the requestSender interface @@ -292,12 +340,7 @@ func (rs *retrySender) send(req request) error { if backoffDelay == backoff.Stop { // throw away the batch err = fmt.Errorf("max elapsed time expired %w", err) - rs.logger.Error( - "Exporting failed. No more retries left. Dropping data.", - zap.Error(err), - zap.Int("dropped_items", req.count()), - ) - return err + return rs.onTemporaryFailure(req, err) } if throttleErr, isThrottle := err.(*throttleRetry); isThrottle { diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index 90465bb9bb1..2ac364d6996 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "go.opentelemetry.io/collector/consumer/pdata" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opencensus.io/metric/metricdata" @@ -36,10 +38,18 @@ import ( "go.opentelemetry.io/collector/obsreport/obsreporttest" ) +func mockRequestUnmarshaler(mr *mockRequest) requestUnmarshaler { + return func(bytes []byte) (request, error) { + return mr, nil + } +} + func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := DefaultQueueSettings() rCfg := DefaultRetrySettings() - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) + mockR := newMockRequest(context.Background(), 2, consumererror.Permanent(errors.New("bad data"))) + be, err := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), mockRequestUnmarshaler(mockR)) + require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -47,7 +57,6 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { assert.NoError(t, be.Shutdown(context.Background())) }) - mockR := newMockRequest(context.Background(), 2, consumererror.Permanent(errors.New("bad data"))) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. require.NoError(t, be.sender.send(mockR)) @@ -63,7 +72,8 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := DefaultQueueSettings() rCfg := DefaultRetrySettings() rCfg.Enabled = false - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) + be, err := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), nopRequestUnmarshaler()) + require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -88,7 +98,8 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := DefaultRetrySettings() rCfg.InitialInterval = 0 - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) + be, err := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), nopRequestUnmarshaler()) + require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -114,7 +125,8 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := DefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := DefaultRetrySettings() - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) + be, err := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), nopRequestUnmarshaler()) + require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -147,7 +159,8 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := DefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := DefaultRetrySettings() - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) + be, err := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), nopRequestUnmarshaler()) + require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -176,7 +189,8 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := DefaultRetrySettings() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) + be, err := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), nopRequestUnmarshaler()) + require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -214,7 +228,8 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := DefaultRetrySettings() rCfg.InitialInterval = 10 * time.Millisecond - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) + be, err := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), nopRequestUnmarshaler()) + require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -245,7 +260,8 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := DefaultRetrySettings() rCfg.InitialInterval = 0 - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) + be, err := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), nopRequestUnmarshaler()) + require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -271,14 +287,15 @@ func TestQueuedRetry_DropOnFull(t *testing.T) { qCfg := DefaultQueueSettings() qCfg.QueueSize = 0 rCfg := DefaultRetrySettings() - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) + be, err := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), nopRequestUnmarshaler()) + require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) - err := be.sender.send(newMockRequest(context.Background(), 2, errors.New("transient error"))) + err = be.sender.send(newMockRequest(context.Background(), 2, errors.New("transient error"))) require.Error(t, err) } @@ -289,7 +306,8 @@ func TestQueuedRetryHappyPath(t *testing.T) { qCfg := DefaultQueueSettings() rCfg := DefaultRetrySettings() - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) + be, err := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), nopRequestUnmarshaler()) + require.NoError(t, err) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -323,7 +341,8 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg := DefaultQueueSettings() qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := DefaultRetrySettings() - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) + be, err := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), nopRequestUnmarshaler()) + require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) for i := 0; i < 7; i++ { @@ -363,6 +382,10 @@ func (mer *mockErrorRequest) onError(error) request { return mer } +func (mer *mockErrorRequest) marshall() ([]byte, error) { + return nil, nil +} + func (mer *mockErrorRequest) count() int { return 7 } @@ -394,6 +417,10 @@ func (m *mockRequest) export(ctx context.Context) error { return ctx.Err() } +func (m *mockRequest) marshall() ([]byte, error) { + return pdata.NewTraces().ToOtlpProtoBytes() +} + func (m *mockRequest) onError(error) request { return &mockRequest{ baseRequest: m.baseRequest, diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index eae4b16c5da..a2ded4067f8 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -43,6 +43,29 @@ func newTracesRequest(ctx context.Context, td pdata.Traces, pusher consumerhelpe } } +func newTraceRequestUnmarshalerFunc(pusher consumerhelper.ConsumeTracesFunc) requestUnmarshaler { + return func(bytes []byte) (request, error) { + traces, err := pdata.TracesFromOtlpProtoBytes(bytes) + if err != nil { + return nil, err + } + + // TODO unmarshall context + + return newTracesRequest(context.Background(), traces, pusher), nil + } +} + +func (req *tracesRequest) marshall() ([]byte, error) { + // Unfortunately, this is perhaps the only type of context which might be safely checked against + // TODO: handle serializing context + // c, ok := client.FromContext(req.context()) + // if ok { + // ... + // } + return req.td.ToOtlpProtoBytes() +} + func (req *tracesRequest) onError(err error) request { var traceError consumererror.Traces if consumererror.AsTraces(err, &traceError) { @@ -85,7 +108,11 @@ func NewTracesExporter( } bs := fromOptions(options...) - be := newBaseExporter(cfg, logger, bs) + be, err := newBaseExporter(cfg, logger, bs, newTraceRequestUnmarshalerFunc(pusher)) + if err != nil { + return nil, err + } + be.setSignalType("traces") be.wrapConsumerSender(func(nextSender requestSender) requestSender { return &tracesExporterWithObservability{ obsrep: obsreport.NewExporter( diff --git a/exporter/exporterhelper/wal.go b/exporter/exporterhelper/wal.go new file mode 100644 index 00000000000..256adf07771 --- /dev/null +++ b/exporter/exporterhelper/wal.go @@ -0,0 +1,200 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporterhelper + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "github.com/jaegertracing/jaeger/pkg/queue" + "github.com/nsqio/go-diskqueue" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +// WALQueue holds the WAL-backed queue +type WALQueue struct { + logger *zap.Logger + stopWG sync.WaitGroup + unmarshaler requestUnmarshaler + wal diskqueue.Interface + numWorkers int + stopped *atomic.Uint32 + quit chan struct{} + quitOnce sync.Once +} + +const ( + // TODO: expose those as configuration parameters + maxBytesPerFile = int64(1 << 28) + minMsgSize = int32(1) + maxMsgSize = int32(1 << 27) + syncEvery = int64(1000) +) + +var ( + errNotFound = errors.New("not found") +) + +func dqLogOption(logger *zap.Logger, lvl diskqueue.LogLevel, f string, args ...interface{}) { + if logger == nil { + return + } + + switch lvl { + case diskqueue.DEBUG: + logger.Debug(fmt.Sprintf(f, args...)) + case diskqueue.INFO: + logger.Info(fmt.Sprintf(f, args...)) + case diskqueue.WARN: + logger.Warn(fmt.Sprintf(f, args...)) + case diskqueue.ERROR: + logger.Error(fmt.Sprintf(f, args...)) + case diskqueue.FATAL: + logger.Fatal(fmt.Sprintf(f, args...)) + } +} + +func newWALQueue(logger *zap.Logger, path string, id string, syncFrequency time.Duration, unmarshaler requestUnmarshaler) (*WALQueue, error) { + syncOption := syncFrequency == 0 + syncEveryOption := syncEvery + if syncOption { + syncEveryOption = 1 + // FIXME: this must be always a positive number, lets figure out something nicer + syncFrequency = 1 * time.Second + } + + walPath := filepath.Join(path, "wal") + err := os.MkdirAll(walPath, 0700) + if err != nil && !os.IsExist(err) { + return nil, err + } + + // TODO: sanitize id? + _wal := diskqueue.New(id, walPath, + maxBytesPerFile, minMsgSize, maxMsgSize, syncEveryOption, syncFrequency, + func(lvl diskqueue.LogLevel, f string, args ...interface{}) { + dqLogOption(logger, lvl, f, args) + }) + + wq := &WALQueue{ + logger: logger, + wal: _wal, + unmarshaler: unmarshaler, + stopped: atomic.NewUint32(0), + quit: make(chan struct{}), + } + + return wq, nil +} + +// StartConsumers starts the given number of consumers which will be consuming items +func (wq *WALQueue) StartConsumers(num int, callback func(item interface{})) { + wq.numWorkers = num + var startWG sync.WaitGroup + + factory := func() queue.Consumer { + return queue.ConsumerFunc(callback) + } + + for i := 0; i < wq.numWorkers; i++ { + wq.stopWG.Add(1) + startWG.Add(1) + go func() { + startWG.Done() + defer wq.stopWG.Done() + consumer := factory() + + for { + if wq.stopped.Load() != 0 { + return + } + req, err := wq.get() + if err == errNotFound || req == nil { + time.Sleep(1 * time.Second) + } else { + consumer.Consume(req) + } + } + }() + } + startWG.Wait() +} + +// Produce adds an item to the queue and returns true if it was accepted +func (wq *WALQueue) Produce(item interface{}) bool { + if wq.stopped.Load() != 0 { + return false + } + + err := wq.put(item.(request)) + return err == nil +} + +// Stop stops accepting items and shuts-down the queue and closes the WAL +func (wq *WALQueue) Stop() { + wq.logger.Debug("Stopping WAL") + wq.stopped.Store(1) + wq.quitOnce.Do(func() { + close(wq.quit) + }) + wq.stopWG.Wait() + err := wq.close() + if err != nil { + wq.logger.Error("Error when closing WAL", zap.Error(err)) + } +} + +// Size returns the current depth of the queue +func (wq *WALQueue) Size() int { + return int(wq.wal.Depth()) +} + +// put marshals the request and puts it into the WAL +func (wq *WALQueue) put(req request) error { + bytes, err := req.marshall() + if err != nil { + return err + } + + writeErr := wq.wal.Put(bytes) + if writeErr != nil { + return writeErr + } + + return nil +} + +// get returns the next request from the queue; note that it might be blocking if there are no entries in the WAL +func (wq *WALQueue) get() (request, error) { + // TODO: Consider making it nonblocking, e.g. timeout after 1 second or so? + // ticker := time.NewTicker(1*time.Second) + // case <-ticker.C.... + + select { + case bytes := <-wq.wal.ReadChan(): + return wq.unmarshaler(bytes) + case <-wq.quit: + return nil, nil + } +} + +func (wq *WALQueue) close() error { + return wq.wal.Close() +} diff --git a/exporter/exporterhelper/wal_test.go b/exporter/exporterhelper/wal_test.go new file mode 100644 index 00000000000..9d7d8ab8a33 --- /dev/null +++ b/exporter/exporterhelper/wal_test.go @@ -0,0 +1,184 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporterhelper + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "go.uber.org/zap" + + "go.opentelemetry.io/collector/consumer/pdata" +) + +func createTestQueue(path string) *WALQueue { + logger, _ := zap.NewDevelopment() + + wq, err := newWALQueue(logger, path, "foo", 100*time.Millisecond, newTraceRequestUnmarshalerFunc(nopTracePusher())) + if err != nil { + panic(err) + } + return wq +} + +func createTemporaryDirectory() string { + directory, err := ioutil.TempDir("", "wal-test") + if err != nil { + panic(err) + } + return directory +} + +func TestWal_RepeatPutCloseReadClose(t *testing.T) { + path := createTemporaryDirectory() + defer os.RemoveAll(path) + + traces := newTraces(1, 10) + req := newTracesRequest(context.Background(), traces, nopTracePusher()) + + for i := 0; i < 10; i++ { + wq := createTestQueue(path) + require.Equal(t, 0, wq.Size()) + err := wq.put(req) + require.NoError(t, err) + err = wq.close() + require.NoError(t, err) + + wq = createTestQueue(path) + require.Equal(t, 1, wq.Size()) + readReq, err := wq.get() + require.NoError(t, err) + require.Equal(t, 0, wq.Size()) + require.Equal(t, req.(*tracesRequest).td, readReq.(*tracesRequest).td) + err = wq.close() + require.NoError(t, err) + } + + // No more items + wq := createTestQueue(path) + require.Equal(t, 0, wq.Size()) + wq.close() +} + +func TestWal_ConsumersProducers(t *testing.T) { + cases := []struct { + numMessagesProduced int + numConsumers int + }{ + { + numMessagesProduced: 1, + numConsumers: 1, + }, + { + numMessagesProduced: 100, + numConsumers: 1, + }, + { + numMessagesProduced: 100, + numConsumers: 3, + }, + { + numMessagesProduced: 1, + numConsumers: 100, + }, + { + numMessagesProduced: 100, + numConsumers: 100, + }, + } + + for _, c := range cases { + t.Run(fmt.Sprintf("#messages: %d #consumers: %d", c.numMessagesProduced, c.numConsumers), func(t *testing.T) { + path := createTemporaryDirectory() + defer os.RemoveAll(path) + + traces := newTraces(1, 10) + req := newTracesRequest(context.Background(), traces, nopTracePusher()) + + wq := createTestQueue(path) + + numMessagesConsumed := 0 + wq.StartConsumers(c.numConsumers, func(item interface{}) { + numMessagesConsumed++ + }) + + for i := 0; i < c.numMessagesProduced; i++ { + wq.Produce(req) + } + + // TODO: proper handling/wait with timeout rather than this + time.Sleep(500 * time.Millisecond) + wq.close() + require.Equal(t, c.numMessagesProduced, numMessagesConsumed) + }) + } +} + +func BenchmarkWal_1Trace10Spans(b *testing.B) { + path := createTemporaryDirectory() + wq := createTestQueue(path) + defer os.RemoveAll(path) + + traces := newTraces(1, 10) + req := newTracesRequest(context.Background(), traces, nopTracePusher()) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := wq.put(req) + require.NoError(b, err) + } + + for i := 0; i < b.N; i++ { + req, err := wq.get() + require.NoError(b, err) + require.NotNil(b, req) + } + + wq.close() +} + +func newTraces(numTraces int, numSpans int) pdata.Traces { + traces := pdata.NewTraces() + batch := traces.ResourceSpans().AppendEmpty() + batch.Resource().Attributes().InsertString("resource-attr", "some-resource") + batch.Resource().Attributes().InsertInt("num-traces", int64(numTraces)) + batch.Resource().Attributes().InsertInt("num-spans", int64(numSpans)) + + for i := 0; i < numTraces; i++ { + traceID := pdata.NewTraceID([16]byte{1, 2, 3, byte(i)}) + ils := batch.InstrumentationLibrarySpans().AppendEmpty() + for j := 0; j < numSpans; j++ { + span := ils.Spans().AppendEmpty() + span.SetTraceID(traceID) + span.SetSpanID(pdata.NewSpanID([8]byte{1, 2, 3, byte(j)})) + span.SetName("should-not-be-changed") + span.Attributes().InsertInt("int-attribute", int64(j)) + span.Attributes().InsertString("str-attribute-1", "foobar") + span.Attributes().InsertString("str-attribute-2", "fdslafjasdk12312312jkl") + span.Attributes().InsertString("str-attribute-3", "AbcDefGeKKjkfdsafasdfsdasdf") + span.Attributes().InsertString("str-attribute-4", "xxxxxx") + span.Attributes().InsertString("str-attribute-5", "abcdef") + } + } + + return traces +} diff --git a/exporter/jaegerexporter/config_test.go b/exporter/jaegerexporter/config_test.go index 3e501f413bb..005c035d31c 100644 --- a/exporter/jaegerexporter/config_test.go +++ b/exporter/jaegerexporter/config_test.go @@ -60,9 +60,10 @@ func TestLoadConfig(t *testing.T) { MaxElapsedTime: 10 * time.Minute, }, QueueSettings: exporterhelper.QueueSettings{ - Enabled: true, - NumConsumers: 2, - QueueSize: 10, + Enabled: true, + NumConsumers: 2, + QueueSize: 10, + WalSyncFrequency: 1 * time.Second, }, GRPCClientSettings: configgrpc.GRPCClientSettings{ Endpoint: "a.new.target:1234", diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index b2f9e45e0ac..c887b1f280d 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -51,9 +51,10 @@ func TestLoadConfig(t *testing.T) { MaxElapsedTime: 10 * time.Minute, }, QueueSettings: exporterhelper.QueueSettings{ - Enabled: true, - NumConsumers: 2, - QueueSize: 10, + Enabled: true, + NumConsumers: 2, + QueueSize: 10, + WalSyncFrequency: 1 * time.Second, }, Topic: "spans", Encoding: "otlp_proto", diff --git a/exporter/otlpexporter/config_test.go b/exporter/otlpexporter/config_test.go index 65580d4108a..444d49d4136 100644 --- a/exporter/otlpexporter/config_test.go +++ b/exporter/otlpexporter/config_test.go @@ -58,9 +58,10 @@ func TestLoadConfig(t *testing.T) { MaxElapsedTime: 10 * time.Minute, }, QueueSettings: exporterhelper.QueueSettings{ - Enabled: true, - NumConsumers: 2, - QueueSize: 10, + Enabled: true, + NumConsumers: 2, + QueueSize: 10, + WalSyncFrequency: 1 * time.Second, }, GRPCClientSettings: configgrpc.GRPCClientSettings{ Headers: map[string]string{ diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index 1fd6670f656..7bbdfd5e09f 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -55,9 +55,10 @@ func TestLoadConfig(t *testing.T) { MaxElapsedTime: 10 * time.Minute, }, QueueSettings: exporterhelper.QueueSettings{ - Enabled: true, - NumConsumers: 2, - QueueSize: 10, + Enabled: true, + NumConsumers: 2, + QueueSize: 10, + WalSyncFrequency: 1 * time.Second, }, HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: map[string]string{ diff --git a/exporter/zipkinexporter/config_test.go b/exporter/zipkinexporter/config_test.go index f4570108390..6473f7d936f 100644 --- a/exporter/zipkinexporter/config_test.go +++ b/exporter/zipkinexporter/config_test.go @@ -61,9 +61,10 @@ func TestLoadConfig(t *testing.T) { MaxElapsedTime: 10 * time.Minute, }, QueueSettings: exporterhelper.QueueSettings{ - Enabled: true, - NumConsumers: 2, - QueueSize: 10, + Enabled: true, + NumConsumers: 2, + QueueSize: 10, + WalSyncFrequency: 1 * time.Second, }, HTTPClientSettings: confighttp.HTTPClientSettings{ Endpoint: "https://somedest:1234/api/v2/spans", diff --git a/go.mod b/go.mod index 0adca801b93..f378f142d76 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/jaegertracing/jaeger v1.22.0 github.com/leoluk/perflib_exporter v0.1.0 + github.com/nsqio/go-diskqueue v1.0.0 github.com/openzipkin/zipkin-go v0.2.5 github.com/pquerna/cachecontrol v0.1.0 // indirect github.com/prometheus/client_golang v1.10.0 diff --git a/go.sum b/go.sum index d77e0bf774e..769b5e56567 100644 --- a/go.sum +++ b/go.sum @@ -758,6 +758,8 @@ github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxzi github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d/go.mod h1:o96djdrsSGy3AWPyBgZMAGfxZNfgntdJG+11KU4QvbU= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/nsqio/go-diskqueue v1.0.0 h1:XRqpx7zTMu9yNVH+cHvA5jEiPNKoYcyEsCVqXP3eFg4= +github.com/nsqio/go-diskqueue v1.0.0/go.mod h1:INuJIxl4ayUsyoNtHL5+9MFPDfSZ0zY93hNY6vhBRsI= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=