Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-258: add TimeReceived in Kafka transformer #235

Merged
merged 1 commit into from
Jun 20, 2022
Merged

NETOBSERV-258: add TimeReceived in Kafka transformer #235

merged 1 commit into from
Jun 20, 2022

Conversation

mariomac
Copy link

If a flow is directly ingested by the kafka transformer instance (e.g. the eBPF agent submits there directly), it needs to add the TimeReceived field as soon as it unmarshalls the flow data.

@codecov-commenter
Copy link

Codecov Report

Merging #235 (d20a08e) into main (4549b72) will increase coverage by 0.02%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##             main     #235      +/-   ##
==========================================
+ Coverage   61.71%   61.74%   +0.02%     
==========================================
  Files          67       67              
  Lines        3897     3900       +3     
==========================================
+ Hits         2405     2408       +3     
  Misses       1346     1346              
  Partials      146      146              
Flag Coverage Δ
unittests 61.74% <100.00%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
pkg/pipeline/decode/decode_json.go 92.00% <100.00%> (+1.09%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 4549b72...d20a08e. Read the comment docs.

@@ -42,6 +43,9 @@ func (c *DecodeJson) Decode(in []interface{}) []config.GenericMap {
continue
}
decodedLine2 := make(config.GenericMap, len(decodedLine))
// flows directly ingested by flp-transformer won't have this field, so we need to add it
// here. If the received line already contains the field, it will be overridden later
decodedLine2["TimeReceived"] = time.Now().Unix()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we should add a check to ensure there is no TimeReceived field?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, if there is no TimeReceived field, it will be added to the current payload. If there is an actual TimeReceived field, it will be added after this, overriding the value that we set here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it what we want? To override TimeReceived if the field is already here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's ok as a quick fix, but we can think about it and iterate from there.

Maybe it would make more sense to have it in the ingester stage rather than in the json decoder ? it looks more like an ingester responsibility to tell when something is received.
And I agree with Olivier that, if the field is already present, it shouldn't be overwritten. E.g. if we have:

(ovs) -> ipfix-ingester -> kafka-write -> kafka-read -> loki

the received time should be the one of ipfix-ingester .. or maybe at the opposite, on loki side?

As we use it to measure the collection latency (by comparing to FlowEndTime), maybe it makes actually more sense to deal with it directly in the loki write, after all.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained it wrong. The timereceived field is never overridden if its already there (please check the tests)

@jotak currently we set the value into the ipfix or protobuf ingesters but we added a new scenario: the eBPF agent can send it directly to the flowlogs-pipeline-transformer instance directly as a JSON via kafka.

In that case, we are bridging the ingesters and the receive time needs to be set in the transformer instance.

But if we agree that the actual ingest time should be now right before Loki writing, this could be removed and we wouldn't need to consider this special case.

@mariomac mariomac merged commit c418a01 into netobserv:main Jun 20, 2022
@mariomac mariomac deleted the timereceived branch June 20, 2022 07:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants