Skip to content

Commit

Permalink
producer.messages.count: Use kgo.Err as reason (#593)
Browse files Browse the repository at this point in the history
Updates the `error_reason` dimension to use the value from the returned
error (`kgo.Err.Message`). This allows us to classify the errors with
the reason more granularly.

Previously, all the produce failures that aren't due to `context.Done()`
would be classified as `unknown`.

---------

Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop authored Nov 6, 2024
1 parent 1451744 commit 3f71ee8
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
GO_TEST_TIMEOUT=60s
GO_TEST_TIMEOUT=180s
GOTESTFLAGS=
GO_TEST_COUNT=10

Expand Down
23 changes: 14 additions & 9 deletions kafka/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"time"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -374,7 +375,7 @@ func (h *metricHooks) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration,
)
}

func (h *metricHooks) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
func (h *metricHooks) OnBrokerDisconnect(kgo.BrokerMetadata, net.Conn) {
attrs := make([]attribute.KeyValue, 0, 2)
attrs = append(attrs, semconv.MessagingSystem("kafka"))
if h.namespace != "" {
Expand All @@ -387,7 +388,7 @@ func (h *metricHooks) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
)
}

func (h *metricHooks) OnBrokerWrite(meta kgo.BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
func (h *metricHooks) OnBrokerWrite(_ kgo.BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
attrs := make([]attribute.KeyValue, 0, 3)
attrs = append(attrs,
semconv.MessagingSystem("kafka"),
Expand Down Expand Up @@ -419,7 +420,7 @@ func (h *metricHooks) OnBrokerWrite(meta kgo.BrokerMetadata, key int16, bytesWri
)
}

func (h *metricHooks) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
func (h *metricHooks) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
attrs := make([]attribute.KeyValue, 0, 3)
attrs = append(attrs, semconv.MessagingSystem("kafka"))
if h.namespace != "" {
Expand Down Expand Up @@ -449,7 +450,7 @@ func (h *metricHooks) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead i
}

// HookProduceBatchWritten is called when a batch has been produced.
func (h *metricHooks) OnProduceBatchWritten(meta kgo.BrokerMetadata,
func (h *metricHooks) OnProduceBatchWritten(_ kgo.BrokerMetadata,
topic string, partition int32, m kgo.ProduceBatchMetrics,
) {
attrs := make([]attribute.KeyValue, 0, 7)
Expand Down Expand Up @@ -491,7 +492,7 @@ func (h *metricHooks) OnProduceBatchWritten(meta kgo.BrokerMetadata,

// OnFetchBatchRead is called once per batch read from Kafka. Records
// `consumer.messages.fetched`.
func (h *metricHooks) OnFetchBatchRead(meta kgo.BrokerMetadata,
func (h *metricHooks) OnFetchBatchRead(_ kgo.BrokerMetadata,
topic string, partition int32, m kgo.FetchBatchMetrics,
) {
attrs := make([]attribute.KeyValue, 0, 6)
Expand Down Expand Up @@ -550,11 +551,15 @@ func (h *metricHooks) OnProduceRecordUnbuffered(r *kgo.Record, err error) {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}

if errors.Is(err, context.DeadlineExceeded) {
var kgoErr *kerr.Error
switch {
case errors.Is(err, context.DeadlineExceeded):
attrs = append(attrs, attribute.String(errorReasonKey, "timeout"))
} else if errors.Is(err, context.Canceled) {
case errors.Is(err, context.Canceled):
attrs = append(attrs, attribute.String(errorReasonKey, "canceled"))
} else {
case errors.As(err, &kgoErr):
attrs = append(attrs, attribute.String(errorReasonKey, kgoErr.Message))
default:
attrs = append(attrs, attribute.String(errorReasonKey, "unknown"))
}

Expand Down Expand Up @@ -586,7 +591,7 @@ func (h *metricHooks) OnFetchRecordUnbuffered(r *kgo.Record, polled bool) {
)
}

func (h *metricHooks) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, throttledAfterResponse bool) {
func (h *metricHooks) OnBrokerThrottle(_ kgo.BrokerMetadata, throttleInterval time.Duration, throttledAfterResponse bool) {
attrs := make([]attribute.KeyValue, 0, 2)
attrs = append(attrs, semconv.MessagingSystem("kafka"))
if h.namespace != "" {
Expand Down
49 changes: 40 additions & 9 deletions kafka/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
Expand All @@ -44,17 +45,18 @@ func TestProducerMetrics(t *testing.T) {
t *testing.T,
producer apmqueue.Producer,
rdr sdkmetric.Reader,
name string,
want []metricdata.Metrics,
) {
topic := apmqueue.Topic("default-topic")
topic := apmqueue.Topic(name)
producer.Produce(ctx,
apmqueue.Record{Topic: topic, Value: []byte("1")},
apmqueue.Record{Topic: topic, Value: []byte("2")},
apmqueue.Record{Topic: topic, Value: []byte("3")},
)

// Fixes https://github.com/elastic/apm-queue/issues/464
<-time.After(1 * time.Millisecond)
<-time.After(time.Millisecond)

// Close the producer so records are flushed.
require.NoError(t, producer.Close())
Expand Down Expand Up @@ -108,7 +110,7 @@ func TestProducerMetrics(t *testing.T) {
}
ctx, cancel := context.WithTimeout(context.Background(), 0)
defer cancel()
test(ctx, t, producer, rdr, want)
test(ctx, t, producer, rdr, "default-topic", want)
})
t.Run("ContextCanceled", func(t *testing.T) {
producer, rdr := setupTestProducer(t, nil)
Expand Down Expand Up @@ -138,11 +140,11 @@ func TestProducerMetrics(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
test(ctx, t, producer, rdr, want)
test(ctx, t, producer, rdr, "default-topic", want)
})
t.Run("Unknown error reason", func(t *testing.T) {
producer, rdr := setupTestProducer(t, nil)
want := metricdata.Metrics{
want := []metricdata.Metrics{{
Name: "producer.messages.count",
Description: "The number of messages produced",
Unit: "1",
Expand All @@ -164,9 +166,38 @@ func TestProducerMetrics(t *testing.T) {
},
},
},
}
}}
require.NoError(t, producer.Close())
test(context.Background(), t, producer, rdr, []metricdata.Metrics{want})
test(context.Background(), t, producer, rdr, "default-topic", want)
})
t.Run("unknown topic", func(t *testing.T) {
producer, rdr := setupTestProducer(t, nil)
want := []metricdata.Metrics{{
Name: "producer.messages.count",
Description: "The number of messages produced",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 3,
Attributes: attribute.NewSet(
attribute.String("outcome", "failure"),
attribute.String(errorReasonKey,
kerr.UnknownTopicOrPartition.Message,
),
attribute.String("namespace", "name_space"),
attribute.String("topic", "name_space-unknown-topic"),
semconv.MessagingSystem("kafka"),
semconv.MessagingDestinationName("unknown-topic"),
semconv.MessagingKafkaDestinationPartition(0),
),
},
},
},
}}
test(context.Background(), t, producer, rdr, "unknown-topic", want)
})
t.Run("Produced", func(t *testing.T) {
producer, rdr := setupTestProducer(t, func(topic string) attribute.KeyValue {
Expand Down Expand Up @@ -281,7 +312,7 @@ func TestProducerMetrics(t *testing.T) {
}},
},
}
test(context.Background(), t, producer, rdr, want)
test(context.Background(), t, producer, rdr, "default-topic", want)
})
t.Run("ProducedWithHeaders", func(t *testing.T) {
producer, rdr := setupTestProducer(t, func(topic string) attribute.KeyValue {
Expand Down Expand Up @@ -365,7 +396,7 @@ func TestProducerMetrics(t *testing.T) {
"key": "value",
"some key": "some value",
})
test(ctx, t, producer, rdr, want)
test(ctx, t, producer, rdr, "default-topic", want)
})
}

Expand Down

0 comments on commit 3f71ee8

Please sign in to comment.