Skip to content

Commit

Permalink
updated exporter to use consumererrors to enable exporthelper retry l…
Browse files Browse the repository at this point in the history
…ogic

Signed-off-by: Granville Schmidt <[email protected]>
  • Loading branch information
gramidt committed Feb 11, 2021
1 parent 31954b6 commit 270a32d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 17 deletions.
13 changes: 5 additions & 8 deletions exporter/lokiexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,17 @@ func (l *lokiExporter) pushLogData(ctx context.Context, ld pdata.Logs) (numDropp

pushReq, numDroppedLogs := l.logDataToLoki(ld)
if len(pushReq.Streams) == 0 {
return numDroppedLogs, nil
return ld.LogRecordCount(), consumererror.Permanent(fmt.Errorf("failed to transform logs into Loki log streams"))
}

buf, err := encode(pushReq)
if err != nil {
numDroppedLogs += ld.LogRecordCount()
return numDroppedLogs, err
return ld.LogRecordCount(), consumererror.Permanent(err)
}

req, err := http.NewRequestWithContext(ctx, "POST", l.config.HTTPClientSettings.Endpoint, bytes.NewReader(buf))
if err != nil {
return numDroppedLogs, consumererror.Permanent(err)
return ld.LogRecordCount(), consumererror.Permanent(err)
}

for k, v := range l.config.HTTPClientSettings.Headers {
Expand All @@ -86,17 +85,15 @@ func (l *lokiExporter) pushLogData(ctx context.Context, ld pdata.Logs) (numDropp

resp, err := l.client.Do(req)
if err != nil {
numDroppedLogs += ld.LogRecordCount()
return numDroppedLogs, err
return ld.LogRecordCount(), consumererror.PartialLogsError(err, ld)
}

_, _ = io.Copy(ioutil.Discard, resp.Body)
_ = resp.Body.Close()

if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
err = fmt.Errorf("HTTP %d %q", resp.StatusCode, http.StatusText(resp.StatusCode))
numDroppedLogs += ld.LogRecordCount()
return numDroppedLogs, err
return ld.LogRecordCount(), consumererror.PartialLogsError(err, ld)
}

return numDroppedLogs, nil
Expand Down
27 changes: 18 additions & 9 deletions exporter/lokiexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
"go.uber.org/zap"
Expand Down Expand Up @@ -150,7 +151,7 @@ func TestExporter_pushLogData(t *testing.T) {
config *Config
genLogsFunc func() pdata.Logs
numDroppedLogs int
shouldErr bool
errFunc func(err error)
}{
{
name: "happy path",
Expand All @@ -168,7 +169,10 @@ func TestExporter_pushLogData(t *testing.T) {
testServer: true,
genLogsFunc: genericGenLogsFunc,
numDroppedLogs: 10,
shouldErr: true,
errFunc: func(err error) {
e := err.(consumererror.PartialError)
require.Equal(t, 10, e.GetLogs().LogRecordCount())
},
},
{
name: "server unavailable",
Expand All @@ -178,7 +182,10 @@ func TestExporter_pushLogData(t *testing.T) {
testServer: false,
genLogsFunc: genericGenLogsFunc,
numDroppedLogs: 10,
shouldErr: true,
errFunc: func(err error) {
e := err.(consumererror.PartialError)
require.Equal(t, 10, e.GetLogs().LogRecordCount())
},
},
{
name: "with no matching attributes",
Expand All @@ -193,7 +200,10 @@ func TestExporter_pushLogData(t *testing.T) {
}))
},
numDroppedLogs: 10,
shouldErr: false,
errFunc: func(err error) {
require.True(t, consumererror.IsPermanent(err))
require.Equal(t, "Permanent error: failed to transform logs into Loki log streams", err.Error())
},
},
{
name: "with partial matching attributes",
Expand All @@ -210,18 +220,17 @@ func TestExporter_pushLogData(t *testing.T) {
conventions.AttributeK8sCluster: pdata.NewAttributeValueString("local"),
"severity": pdata.NewAttributeValueString("debug"),
}))
matchingLogs.ResourceLogs().CopyTo(outLogs.ResourceLogs())
matchingLogs.ResourceLogs().MoveAndAppendTo(outLogs.ResourceLogs())

nonMatchingLogs := createLogData(5,
pdata.NewAttributeMap().InitFromMap(map[string]pdata.AttributeValue{
"not.a.match": pdata.NewAttributeValueString("random"),
}))
nonMatchingLogs.ResourceLogs().CopyTo(outLogs.ResourceLogs())
nonMatchingLogs.ResourceLogs().MoveAndAppendTo(outLogs.ResourceLogs())

return outLogs
},
numDroppedLogs: 5,
shouldErr: false,
},
}
for _, tt := range tests {
Expand All @@ -248,8 +257,8 @@ func TestExporter_pushLogData(t *testing.T) {

numDroppedLogs, err := exp.pushLogData(context.Background(), tt.genLogsFunc())

if tt.shouldErr {
assert.Error(t, err)
if tt.errFunc != nil {
tt.errFunc(err)
return
}

Expand Down
4 changes: 4 additions & 0 deletions exporter/lokiexporter/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhr
github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v0.0.0-20161028175848-04cdfd42973b/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denisenkom/go-mssqldb v0.0.0-20190515213511-eb9f6a1743f3/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM=
github.com/dgraph-io/badger v1.6.2/go.mod h1:JW2yswe3V058sS0kZ2h/AXeDSqFjxnZcRrVH//y2UQE=
Expand Down Expand Up @@ -947,6 +948,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5/go.mod h1:eCbImbZ95eXtAUIbLAuAVnBnwf83mjf6QIVH8SHYwqQ=
github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
Expand Down Expand Up @@ -1123,6 +1125,7 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
Expand Down Expand Up @@ -1717,6 +1720,7 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20191120175047-4206685974f2/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200603094226-e3079894b1e8/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
Expand Down

0 comments on commit 270a32d

Please sign in to comment.