Skip to content

Commit

Permalink
Merge pull request netobserv#237 from OlivierCazade/NETOBSERV-390
Browse files Browse the repository at this point in the history
Netobserv 390: fix kafka transformer
  • Loading branch information
OlivierCazade authored Jun 24, 2022
2 parents 0e670ba + 30d60a2 commit 16d9c36
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 22 deletions.
2 changes: 2 additions & 0 deletions pkg/api/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ type IngestKafka struct {
StartOffset string `yaml:"startOffset,omitempty" json:"startOffset,omitempty" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"`
Decoder Decoder `yaml:"decoder,omitempty" json:"decoder" doc:"decoder to use (E.g. json or protobuf)"`
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
CommitInterval int64 `yaml:"commitInterval,omitempty" json:"commitInterval,omitempty" doc:"the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously."`
}
62 changes: 46 additions & 16 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,19 @@ type kafkaReadMessage interface {
}

type ingestKafka struct {
kafkaParams api.IngestKafka
kafkaReader kafkaReadMessage
decoder decode.Decoder
in chan string
exitChan <-chan struct{}
prevRecords []config.GenericMap // copy of most recently sent records; for testing and debugging
kafkaParams api.IngestKafka
kafkaReader kafkaReadMessage
decoder decode.Decoder
in chan string
exitChan <-chan struct{}
prevRecords []config.GenericMap // copy of most recently sent records; for testing and debugging
batchMaxLength int
}

const channelSizeKafka = 1000
const defaultBatchReadTimeout = int64(100)
const defaultBatchReadTimeout = int64(1000)
const defaultKafkaBatchMaxLength = 500
const defaultKafkaCommitInterval = 500

// Ingest ingests entries from kafka topic
func (ingestK *ingestKafka) Ingest(out chan<- []config.GenericMap) {
Expand Down Expand Up @@ -83,21 +86,36 @@ func (ingestK *ingestKafka) kafkaListener() {
func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) {
var records []interface{}
duration := time.Duration(ingestK.kafkaParams.BatchReadTimeout) * time.Millisecond
flushRecords := time.NewTicker(duration)
for {
select {
case <-ingestK.exitChan:
log.Debugf("exiting ingestKafka because of signal")
return
case record := <-ingestK.in:
records = append(records, record)
case <-time.After(duration): // Maximum batch time for each batch
if len(records) >= ingestK.batchMaxLength {
log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in))
decoded := ingestK.decoder.Decode(records)
out <- decoded
ingestK.prevRecords = decoded
log.Debugf("prevRecords = %v", ingestK.prevRecords)
records = []interface{}{}
}
case <-flushRecords.C: // Maximum batch time for each batch
// Process batch of records (if not empty)
if len(records) > 0 {
log.Debugf("ingestKafka sending %d records", len(records))
if len(ingestK.in) > 0 {
for len(records) < ingestK.batchMaxLength && len(ingestK.in) > 0 {
record := <-ingestK.in
records = append(records, record)
}
}
log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in))
decoded := ingestK.decoder.Decode(records)
out <- decoded
ingestK.prevRecords = decoded
log.Debugf("prevRecords = %v", ingestK.prevRecords)
out <- decoded
}
records = []interface{}{}
}
Expand Down Expand Up @@ -145,12 +163,18 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
}
log.Infof("BatchReadTimeout = %d", jsonIngestKafka.BatchReadTimeout)

commitInterval := int64(defaultKafkaCommitInterval)
if jsonIngestKafka.CommitInterval != 0 {
commitInterval = jsonIngestKafka.CommitInterval
}

kafkaReader := kafkago.NewReader(kafkago.ReaderConfig{
Brokers: jsonIngestKafka.Brokers,
Topic: jsonIngestKafka.Topic,
GroupID: jsonIngestKafka.GroupId,
GroupBalancers: groupBalancers,
StartOffset: startOffset,
CommitInterval: time.Duration(commitInterval) * time.Millisecond,
})
if kafkaReader == nil {
errMsg := "NewIngestKafka: failed to create kafka-go reader"
Expand All @@ -164,12 +188,18 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
return nil, err
}

bml := defaultKafkaBatchMaxLength
if jsonIngestKafka.BatchMaxLen != 0 {
bml = jsonIngestKafka.BatchMaxLen
}

return &ingestKafka{
kafkaParams: jsonIngestKafka,
kafkaReader: kafkaReader,
decoder: decoder,
exitChan: utils.ExitChannel(),
in: make(chan string, channelSizeKafka),
prevRecords: make([]config.GenericMap, 0),
kafkaParams: jsonIngestKafka,
kafkaReader: kafkaReader,
decoder: decoder,
exitChan: utils.ExitChannel(),
in: make(chan string, channelSizeKafka),
prevRecords: make([]config.GenericMap, 0),
batchMaxLength: bml,
}, nil
}
69 changes: 63 additions & 6 deletions pkg/pipeline/ingest/ingest_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ parameters:
groupBalancers: ["rackAffinity"]
decoder:
type: json
batchMaxLen: 1000
commitInterval: 1000
`

func initNewIngestKafka(t *testing.T, configTemplate string) Ingester {
Expand All @@ -85,6 +87,8 @@ func Test_NewIngestKafka1(t *testing.T) {
require.Equal(t, "FirstOffset", ingestKafka.kafkaParams.StartOffset)
require.Equal(t, 2, len(ingestKafka.kafkaReader.Config().GroupBalancers))
require.Equal(t, int64(300), ingestKafka.kafkaParams.BatchReadTimeout)
require.Equal(t, int(500), ingestKafka.batchMaxLength)
require.Equal(t, time.Duration(500)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval)
}

func Test_NewIngestKafka2(t *testing.T) {
Expand All @@ -97,6 +101,8 @@ func Test_NewIngestKafka2(t *testing.T) {
require.Equal(t, "LastOffset", ingestKafka.kafkaParams.StartOffset)
require.Equal(t, 1, len(ingestKafka.kafkaReader.Config().GroupBalancers))
require.Equal(t, defaultBatchReadTimeout, ingestKafka.kafkaParams.BatchReadTimeout)
require.Equal(t, int(1000), ingestKafka.batchMaxLength)
require.Equal(t, time.Duration(1000)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval)
}

func removeTimestamp(receivedEntries []config.GenericMap) {
Expand Down Expand Up @@ -141,17 +147,16 @@ func Test_IngestKafka(t *testing.T) {
}

type fakeKafkaReader struct {
readToDo int
mock.Mock
}

var fakeRecord = []byte(`{"Bytes":20801,"DstAddr":"10.130.2.1","DstPort":36936,"Packets":401,"SrcAddr":"10.130.2.13","SrcPort":3100}`)

var performedRead = false

// ReadMessage runs in the kafka client thread, which blocks until data is available.
// If data is always available, we have an infinite loop. So we return data only once.
// If data is always available, we have an infinite loop. So we return data only a specified number of time.
func (f *fakeKafkaReader) ReadMessage(ctx context.Context) (kafkago.Message, error) {
if performedRead {
if f.readToDo == 0 {
// block indefinitely
c := make(chan struct{})
<-c
Expand All @@ -160,7 +165,7 @@ func (f *fakeKafkaReader) ReadMessage(ctx context.Context) (kafkago.Message, err
Topic: "topic1",
Value: fakeRecord,
}
performedRead = true
f.readToDo -= 1
return message, nil
}

Expand All @@ -174,7 +179,7 @@ func Test_KafkaListener(t *testing.T) {
ingestKafka := newIngest.(*ingestKafka)

// change the ReadMessage function to the mock-up
fr := fakeKafkaReader{}
fr := fakeKafkaReader{readToDo: 1}
ingestKafka.kafkaReader = &fr

// run Ingest in a separate thread
Expand All @@ -192,3 +197,55 @@ func Test_KafkaListener(t *testing.T) {
require.Equal(t, 1, len(receivedEntries))
require.Equal(t, test.DeserializeJSONToMap(t, string(fakeRecord)), receivedEntries[0])
}

func Test_MaxBatchLength(t *testing.T) {
ingestOutput := make(chan []config.GenericMap)
newIngest := initNewIngestKafka(t, testConfig1)
ingestKafka := newIngest.(*ingestKafka)

// change the ReadMessage function to the mock-up
fr := fakeKafkaReader{readToDo: 15}
ingestKafka.kafkaReader = &fr
ingestKafka.batchMaxLength = 10
ingestKafka.kafkaParams.BatchReadTimeout = 10000

// run Ingest in a separate thread
go func() {
ingestKafka.Ingest(ingestOutput)
}()

// wait for the data to have been processed
receivedEntries := <-ingestOutput

require.Equal(t, 10, len(receivedEntries))
}

func Test_BatchTimeout(t *testing.T) {
ingestOutput := make(chan []config.GenericMap)
newIngest := initNewIngestKafka(t, testConfig1)
ingestKafka := newIngest.(*ingestKafka)

// change the ReadMessage function to the mock-up
fr := fakeKafkaReader{readToDo: 5}
ingestKafka.kafkaReader = &fr
ingestKafka.batchMaxLength = 1000
ingestKafka.kafkaParams.BatchReadTimeout = 100

beforeIngest := time.Now()
// run Ingest in a separate thread
go func() {
ingestKafka.Ingest(ingestOutput)
}()

require.Equal(t, 0, len(ingestOutput))
// wait for the data to have been processed
receivedEntries := <-ingestOutput
require.Equal(t, 5, len(receivedEntries))

afterIngest := time.Now()

// We check that we get entries because of the timer
// Time must be above timer value but not too much, 20ms is our margin here
require.LessOrEqual(t, int64(100), afterIngest.Sub(beforeIngest).Milliseconds())
require.Greater(t, int64(120), afterIngest.Sub(beforeIngest).Milliseconds())
}

0 comments on commit 16d9c36

Please sign in to comment.