-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(kafka sink): remove restriction on data type #18266
Conversation
✅ Deploy Preview for vrl-playground ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
✅ Deploy Preview for vector-project canceled.
|
2352f71
to
837ebc6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @hvtuananh ,
Thanks for this!
I think it'd be good to add a test. I'm already seeing a few other places that need to be updated:
vector/src/sinks/kafka/request_builder.rs
Lines 67 to 87 in 3a6af99
fn get_key(event: &Event, key_field: &Option<String>) -> Option<Bytes> { | |
key_field.as_ref().and_then(|key_field| match event { | |
Event::Log(log) => log | |
.get(key_field.as_str()) | |
.map(|value| value.coerce_to_bytes()), | |
Event::Metric(metric) => metric | |
.tags() | |
.and_then(|tags| tags.get(key_field)) | |
.map(|value| value.to_owned().into()), | |
_ => None, | |
}) | |
} | |
fn get_timestamp_millis(event: &Event) -> Option<i64> { | |
match &event { | |
Event::Log(log) => log.get_timestamp().and_then(|v| v.as_timestamp()).copied(), | |
Event::Metric(metric) => metric.timestamp(), | |
_ => None, | |
} | |
.map(|ts| ts.timestamp_millis()) | |
} |
And, potentially, here:
vector/src/sinks/kafka/request_builder.rs
Lines 89 to 120 in 3a6af99
fn get_headers(event: &Event, headers_key: &Option<String>) -> Option<OwnedHeaders> { | |
headers_key.as_ref().and_then(|headers_key| { | |
if let Event::Log(log) = event { | |
if let Some(headers) = log.get(headers_key.as_str()) { | |
match headers { | |
Value::Object(headers_map) => { | |
let mut owned_headers = OwnedHeaders::new_with_capacity(headers_map.len()); | |
for (key, value) in headers_map { | |
if let Value::Bytes(value_bytes) = value { | |
owned_headers = owned_headers.insert(Header { | |
key, | |
value: Some(value_bytes.as_ref()), | |
}); | |
} else { | |
emit!(KafkaHeaderExtractionError { | |
header_field: headers_key | |
}); | |
} | |
} | |
return Some(owned_headers); | |
} | |
_ => { | |
emit!(KafkaHeaderExtractionError { | |
header_field: headers_key | |
}); | |
} | |
} | |
} | |
} | |
None | |
}) | |
} |
As those places have different logic depending on the incoming data type and need to be updated to handle traces.
As to the question of where to add tests, I think adding an integration test here would be suitable: https://github.com/vectordotdev/vector/blob/master/src/sinks/kafka/tests.rs |
Hello folks. Any updates on this one ? ;) |
Thank you for your contribution to Vector! To keep the repository tidy and focused, we are closing this PR due to inactivity. We greatly appreciate the time and effort you've put into this PR.If you'd like to continue working on it, we encourage you to re-open the PR and we would be delighted to review it again. Before re-opening, please use |
closes #18252