forked from slackhq/go-audit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka_writer.go
94 lines (82 loc) · 2.21 KB
/
kafka_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package main
import (
"context"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/sirupsen/logrus"
)
// Encoder encodes data from auditd to publish it in Kafka.
type Encoder interface {
Encode(data []byte) (value []byte, err error)
}
// KafkaConfig defines configuration for Kafka Writer.
type KafkaConfig struct {
Enabled bool `yaml:"enabled"`
Attempts int `yaml:"attempts"`
Topic string `yaml:"topic"`
Encoder EncoderConfig `yaml:"encoder"`
Config kafka.ConfigMap `yaml:"config"`
}
// KafkaWriter is an io.Writer that writes to the Kafka.
type KafkaWriter struct {
producer *kafka.Producer
topic string
enc Encoder
}
// NewKafkaWriter creates new KafkaWrite.
func NewKafkaWriter(ctx context.Context, cfg KafkaConfig) (*KafkaWriter, error) {
cfg.Encoder.Topic = cfg.Topic
enc, err := NewEncoder(cfg.Encoder)
if err != nil {
return nil, err
}
p, err := kafka.NewProducer(&cfg.Config)
if err != nil {
return nil, err
}
kw := &KafkaWriter{
producer: p,
topic: cfg.Topic,
enc: enc,
}
go kw.handleResponse(ctx)
return kw, nil
}
// Write writes data to the Kafka, implements io.Writer.
func (kw *KafkaWriter) Write(value []byte) (int, error) {
inFlightLogs.WithLabelValues(hostname).Inc()
value, err := kw.enc.Encode(value)
if err != nil {
return 0, err
}
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &kw.topic,
Partition: kafka.PartitionAny,
},
Value: value,
}
// set delivery channel `nil` because we will read from Events channel
if err := kw.producer.Produce(msg, nil); err != nil {
return 0, err
}
return len(value), nil
}
func (kw *KafkaWriter) handleResponse(ctx context.Context) {
for {
select {
case evt := <-kw.producer.Events():
if msg, ok := evt.(*kafka.Message); ok {
if msg.TopicPartition.Error != nil {
logrus.WithError(msg.TopicPartition.Error).Error("failed to producer message")
sentErrorsTotal.WithLabelValues(hostname).Inc()
}
inFlightLogs.WithLabelValues(hostname).Dec()
sentLatencyNanoseconds.WithLabelValues(hostname).Observe(float64(time.Since(msg.Timestamp)))
}
case <-ctx.Done():
kw.producer.Close()
return
}
}
}