Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent storage in queued_retry, backed by file storage extension #3274

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ea5cd50
Basic persistent queue implementation based on storage extension inte…
pmm-sumo Jun 15, 2021
5b0bd72
Code cleanup and clarifications
pmm-sumo Jun 16, 2021
190434b
Replace atomic with channel
pmm-sumo Jun 16, 2021
05aa648
Handle queued_retry persistence for item processed by consumers
pmm-sumo Jun 22, 2021
12891e5
Fix test condition for capacity
pmm-sumo Jun 23, 2021
829b0fe
Remove unnecessary for loop
pmalek Jun 30, 2021
bd683d8
Update to recent core version
pmm-sumo Jul 12, 2021
24bc134
Use batching in persistent storage
pmm-sumo Jul 19, 2021
bc806fe
Make sure empty requests are ignored
pmm-sumo Jul 19, 2021
89253e7
Better logging and overall cleanup (addressing PR comments)
pmm-sumo Aug 25, 2021
6412dca
Update persistent queue to use internal Bounded Queue
pmm-sumo Aug 27, 2021
1bb552e
Do not operation if marshaling failed
pmm-sumo Aug 27, 2021
f331906
Better batch handling and removal of unnecessary mutex
pmm-sumo Aug 30, 2021
5545f0c
Replace polling with signaling through channel
pmm-sumo Aug 30, 2021
249c678
Remove persistent queue mutexes
pmm-sumo Aug 31, 2021
7b3794e
Mute logs during benchmarks
pmm-sumo Aug 31, 2021
64b713a
Cleanup error messages
pmm-sumo Aug 31, 2021
647e7bf
Use enable_unstable build flag to control inclusion of persistent buf…
pmm-sumo Sep 2, 2021
e35bb59
Use more clear terminology: "dispatch" rather than "processing"
pmm-sumo Sep 3, 2021
7484784
Doc clarification on enable_unstable flag
pmm-sumo Sep 3, 2021
b7a78a3
Fix small typo in README.md
pmm-sumo Sep 3, 2021
9ca46a0
Include unstable collector in build artifacts
pmm-sumo Sep 3, 2021
5326031
Rename queued_retry experimental and inmemory implementation files
pmm-sumo Sep 3, 2021
0a65246
Fix error in Makefile
pmm-sumo Sep 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ jobs:
- attach_to_workspace
- run:
name: Build collector for all archs
command: grep ^binaries-all-sys Makefile|fmt -w 1|tail -n +2|circleci tests split|xargs make
command: grep ^binaries-all-sys Makefile|fmt -w 1|grep -v binaries-all-sys|circleci tests split|xargs make
- run:
name: Log checksums to console
command: shasum -a 256 bin/*
Expand Down
10 changes: 10 additions & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,13 @@ jobs:
with:
name: collector-binaries
path: ./bin.tar
- name: Build Unstable Collector for All Architectures
run: make binaries-all-sys-unstable
- name: Create Unstable Collector Binaries Archive
run: tar -cvf bin-unstable.tar ./bin/*unstable
- name: Upload Unstable Collector Binaries
uses: actions/[email protected]
with:
name: collector-binaries-unstable
path: ./bin-unstable.tar

36 changes: 34 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ gotestinstall:

.PHONY: gotest
gotest:
@$(MAKE) for-all CMD="make test"
@$(MAKE) for-all CMD="make test test-unstable"

.PHONY: gobenchmark
gobenchmark:
Expand All @@ -81,7 +81,7 @@ gotest-with-cover:

.PHONY: golint
golint:
@$(MAKE) for-all CMD="make lint"
@$(MAKE) for-all CMD="make lint lint-unstable"

.PHONY: goimpi
goimpi:
Expand Down Expand Up @@ -148,6 +148,11 @@ otelcol:
go generate ./...
$(MAKE) build-binary-internal

.PHONY: otelcol-unstable
otelcol-unstable:
go generate ./...
$(MAKE) build-binary-internal-unstable

.PHONY: run
run:
GO111MODULE=on go run --race ./cmd/otelcol/... --config ${RUN_CONFIG} ${RUN_ARGS}
Expand Down Expand Up @@ -213,6 +218,9 @@ docker-otelcol:
.PHONY: binaries-all-sys
binaries-all-sys: binaries-darwin_amd64 binaries-darwin_arm64 binaries-linux_amd64 binaries-linux_arm64 binaries-windows_amd64

.PHONY: binaries-all-sys-unstable
binaries-all-sys-unstable: binaries-darwin_amd64-unstable binaries-darwin_arm64-unstable binaries-linux_amd64-unstable binaries-linux_arm64-unstable binaries-windows_amd64-unstable

.PHONY: binaries-darwin_amd64
binaries-darwin_amd64:
GOOS=darwin GOARCH=amd64 $(MAKE) build-binary-internal
Expand All @@ -233,10 +241,34 @@ binaries-linux_arm64:
binaries-windows_amd64:
GOOS=windows GOARCH=amd64 EXTENSION=.exe $(MAKE) build-binary-internal

.PHONY: binaries-darwin_amd64-unstable
binaries-darwin_amd64-unstable:
GOOS=darwin GOARCH=amd64 $(MAKE) build-binary-internal-unstable

.PHONY: binaries-darwin_arm64-unstable
binaries-darwin_arm64-unstable:
GOOS=darwin GOARCH=arm64 $(MAKE) build-binary-internal-unstable

.PHONY: binaries-linux_amd64-unstable
binaries-linux_amd64-unstable:
GOOS=linux GOARCH=amd64 $(MAKE) build-binary-internal-unstable

.PHONY: binaries-linux_arm64-unstable
binaries-linux_arm64-unstable:
GOOS=linux GOARCH=arm64 $(MAKE) build-binary-internal-unstable

.PHONY: binaries-windows_amd64-unstable
binaries-windows_amd64-unstable:
GOOS=windows GOARCH=amd64 EXTENSION=.exe $(MAKE) build-binary-internal-unstable

.PHONY: build-binary-internal
build-binary-internal:
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -o ./bin/otelcol_$(GOOS)_$(GOARCH)$(EXTENSION) $(BUILD_INFO) ./cmd/otelcol

.PHONY: build-binary-internal-unstable
build-binary-internal-unstable:
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -o ./bin/otelcol_$(GOOS)_$(GOARCH)$(EXTENSION)_unstable $(BUILD_INFO) -tags enable_unstable ./cmd/otelcol

.PHONY: deb-rpm-package
%-package: ARCH ?= amd64
%-package:
Expand Down
8 changes: 8 additions & 0 deletions Makefile.Common
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ IMPI=impi
test:
@echo $(ALL_PKGS) | xargs -n 10 $(GOTEST) $(GOTEST_OPT)

.PHONY: test-unstable
test-unstable:
@echo $(ALL_PKGS) | xargs -n 10 $(GOTEST) $(GOTEST_OPT) -tags enable_unstable

.PHONY: benchmark
benchmark:
$(GOTEST) -bench=. -run=notests ./...
Expand All @@ -25,6 +29,10 @@ fmt:
lint:
$(LINT) run --allow-parallel-runners

.PHONY: lint-unstable
lint-unstable:
$(LINT) run --allow-parallel-runners --build-tags enable_unstable

.PHONY: impi
impi:
@$(IMPI) --local go.opentelemetry.io/collector --scheme stdThirdPartyLocal ./...
82 changes: 81 additions & 1 deletion exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,92 @@ The following configuration options can be modified:
- `sending_queue`
- `enabled` (default = true)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
- `queue_size` (default = 5000): Maximum number of batches kept in memory before data; ignored if `enabled` is `false`;
- `queue_size` (default = 5000): Maximum number of batches kept in memory or on disk (for persistent storage) before dropping; ignored if `enabled` is `false`
User should calculate this as `num_seconds * requests_per_second` where:
- `num_seconds` is the number of seconds to buffer in case of a backend outage
- `requests_per_second` is the average number of requests per seconds.
- `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension
(note, `enable_unstable` build tag needs to be enabled first, see below for more details)
- `resource_to_telemetry_conversion`
- `enabled` (default = false): If `enabled` is `true`, all the resource attributes will be converted to metric labels by default.
- `timeout` (default = 5s): Time to wait per individual attempt to send data to a backend.

The full list of settings exposed for this helper exporter are documented [here](factory.go).

### Persistent Queue

**Status: under development**

> :warning: The capability is under development and currently can be enabled only in OpenTelemetry
> Collector Contrib with `enable_unstable` build tag set.

When `persistent_storage_enabled` is set to true, the queue is being buffered to disk using
[file storage extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage).
If collector instance is killed while having some items in the persistent queue, on restart the items are being picked and
the exporting is continued.

```
pmm-sumo marked this conversation as resolved.
Show resolved Hide resolved
┌─Consumer #1─┐
│ ┌───┐ │
──────Deleted────── ┌───►│ │ 1 │ ├───► Success
Waiting in channel x x x │ │ └───┘ │
for consumer ───┐ x x x │ │ │
│ x x x │ └─────────────┘
▼ x x x │
┌─────────────────────────────────────────x─────x───┐ │ ┌─Consumer #2─┐
│ x x x │ │ │ ┌───┐ │
│ ┌───┐ ┌───┐ ┌───┐ ┌─x─┐ ┌───┐ ┌─x─┐ ┌─x─┐ │ │ │ │ 2 │ ├───► Permanent -> X
│ n+1 │ n │ ... │ 6 │ │ 5 │ │ 4 │ │ 3 │ │ 2 │ │ 1 │ ├────┼───►│ └───┘ │ failure
│ └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ │ │ │ │
│ │ │ └─────────────┘
└───────────────────────────────────────────────────┘ │
▲ ▲ ▲ ▲ │ ┌─Consumer #3─┐
│ │ │ │ │ │ ┌───┐ │
│ │ │ │ │ │ │ 3 │ ├───► (in progress)
write read └─────┬─────┘ ├───►│ └───┘ │
index index │ │ │ │
▲ │ │ └─────────────┘
│ │ │
│ currently │ ┌─Consumer #4─┐
│ dispatched │ │ ┌───┐ │ Temporary
│ └───►│ │ 4 │ ├───► failure
│ │ └───┘ │ │
│ │ │ │
│ └─────────────┘ │
│ ▲ │
│ └── Retry ───────┤
│ │
│ │
└────────────────────────────────────── Requeuing ◄────── Retry limit exceeded ───┘
```

Example:

```
receivers:
otlp:
protocols:
grpc:
exporters:
otlp:
endpoint: <ENDPOINT>
sending_queue:
persistent_storage_enabled: true
extensions:
file_storage:
directory: /var/lib/storage/otc
timeout: 10s
service:
extensions: [file_storage]
pipelines:
metrics:
receivers: [otlp]
exporters: [otlp]
logs:
receivers: [otlp]
exporters: [otlp]
traces:
receivers: [otlp]
exporters: [otlp]

```
28 changes: 24 additions & 4 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,26 @@ type request interface {
onError(error) request
// Returns the count of spans/metric points or log records.
count() int
// marshal serializes the current request into a byte stream
marshal() ([]byte, error)
// onProcessingFinished calls the optional callback function to handle cleanup after all processing is finished
onProcessingFinished()
// setOnProcessingFinished allows to set an optional callback function to do the cleanup (e.g. remove the item from persistent queue)
setOnProcessingFinished(callback func())
pmm-sumo marked this conversation as resolved.
Show resolved Hide resolved
}

// requestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request
type requestUnmarshaler func([]byte) (request, error)

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
send(req request) error
}

// baseRequest is a base implementation for the request.
type baseRequest struct {
ctx context.Context
ctx context.Context
processingFinishedCallback func()
}

func (req *baseRequest) context() context.Context {
Expand All @@ -72,6 +82,16 @@ func (req *baseRequest) setContext(ctx context.Context) {
req.ctx = ctx
}

func (req *baseRequest) setOnProcessingFinished(callback func()) {
req.processingFinishedCallback = callback
}

func (req *baseRequest) onProcessingFinished() {
if req.processingFinishedCallback != nil {
req.processingFinishedCallback()
}
}

// baseSettings represents all the options that users can configure.
type baseSettings struct {
componentOptions []componenthelper.Option
Expand Down Expand Up @@ -159,7 +179,7 @@ type baseExporter struct {
qrSender *queuedRetrySender
}

func newBaseExporter(cfg config.Exporter, set component.ExporterCreateSettings, bs *baseSettings) *baseExporter {
func newBaseExporter(cfg config.Exporter, set component.ExporterCreateSettings, bs *baseSettings, signal config.DataType, reqUnmarshaler requestUnmarshaler) *baseExporter {
be := &baseExporter{
Component: componenthelper.New(bs.componentOptions...),
}
Expand All @@ -169,7 +189,7 @@ func newBaseExporter(cfg config.Exporter, set component.ExporterCreateSettings,
ExporterID: cfg.ID(),
ExporterCreateSettings: set,
})
be.qrSender = newQueuedRetrySender(cfg.ID().String(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.qrSender = newQueuedRetrySender(cfg.ID(), signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender

return be
Expand All @@ -189,7 +209,7 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
}

// If no error then start the queuedRetrySender.
return be.qrSender.start()
return be.qrSender.start(ctx, host)
}

// Shutdown all senders and exporter and is invoked during service shutdown.
Expand Down
16 changes: 15 additions & 1 deletion exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/model/pdata"
)

var (
Expand All @@ -37,7 +39,7 @@ var (
)

func TestBaseExporter(t *testing.T) {
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions())
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(), "", nopRequestUnmarshaler())
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
}
Expand All @@ -51,6 +53,8 @@ func TestBaseExporterWithOptions(t *testing.T) {
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithTimeout(DefaultTimeoutSettings())),
"",
nopRequestUnmarshaler(),
)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
require.Equal(t, want, be.Shutdown(context.Background()))
Expand All @@ -64,3 +68,13 @@ func checkStatus(t *testing.T, sd *oteltest.Span, err error) {
require.Equal(t, codes.Unset, sd.StatusCode(), "SpanData %v", sd)
}
}

func nopTracePusher() consumerhelper.ConsumeTracesFunc {
return func(ctx context.Context, ld pdata.Traces) error {
return nil
}
}

func nopRequestUnmarshaler() requestUnmarshaler {
return newTraceRequestUnmarshalerFunc(nopTracePusher())
}
32 changes: 32 additions & 0 deletions exporter/exporterhelper/consumers_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exporterhelper

// consumersQueue is largely based on queue.BoundedQueue and matches the subset used in the collector
// It describes a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue
// (queue.BoundedQueue) or via disk-based queue (persistentQueue)
type consumersQueue interface {
// StartConsumers starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
StartConsumers(num int, callback func(item interface{}))
// Produce is used by the producer to submit new item to the queue. Returns false if the item wasn't added
// to the queue due to queue overflow.
Produce(item interface{}) bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is what queue.BoundedQueue uses but I find Produce to be a weird name for what it does. Perhaps instead use Add, Put, Push or similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we could refactor BoundedQueue in a separate PR

// Stop stops all consumers, as well as the length reporter if started,
// and releases the items channel. It blocks until all consumers have stopped.
Stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add cancellable Context to facilitate timely shutdown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like above, I think we could organise it through a separate PR, now when BoundedQueue is copied into OpenTelemetry Collector codebase

// Size returns the current Size of the queue
Size() int
}
Loading