-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka add gauge v1 #33408
Kafka add gauge v1 #33408
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #33408 +/- ##
=============================================
+ Coverage 57.47% 60.42% +2.94%
- Complexity 1474 15172 +13698
=============================================
Files 985 2760 +1775
Lines 155802 267597 +111795
Branches 1076 12161 +11085
=============================================
+ Hits 89550 161700 +72150
- Misses 64035 99435 +35400
- Partials 2217 6462 +4245
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
2791b23
to
b91a77f
Compare
R: @sjvanrossum for the kafka io part, thanks in advance! |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
b91a77f
to
948dfe6
Compare
Run Java PreCommit |
R: @johnjcasey for the sdk portion of it. |
.../java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java
Outdated
Show resolved
Hide resolved
Run Java_Pulsar_IO_Direct PreCommit |
11bee27
to
b9d2f2b
Compare
Run Java PreCommit |
Run Java_GCP_IO_Direct PreCommit |
Run Java PreCommit |
Run Java PreCommit |
Run Java_GCP_IO_Direct PreCommit |
1 similar comment
Run Java_GCP_IO_Direct PreCommit |
Run Java PreCommit |
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpGauge.java
Outdated
Show resolved
Hide resolved
...a/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java
Outdated
Show resolved
Hide resolved
/** | ||
* @param topicName topicName | ||
* @param partitionId partitionId for the topic Only included in the metric key if | ||
* 'supportsMetricsDeletion' is enabled. | ||
* @param backlog backlog for the topic Only included in the metric key if | ||
* 'supportsMetricsDeletion' is enabled. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
" Only" -> ". Only"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed, thanks for catching that.
@@ -71,11 +79,17 @@ abstract class KafkaMetricsImpl implements KafkaMetrics { | |||
|
|||
abstract HashMap<String, ConcurrentLinkedQueue<Duration>> perTopicRpcLatencies(); | |||
|
|||
static ConcurrentHashMap<String, Gauge> backlogGauges = new ConcurrentHashMap<String, Gauge>(); | |||
|
|||
abstract HashMap<String, Long> perTopicPartitionBacklogs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an instance of this class may be concurrently updated, then HashMap
needs to be replaced (ditto for the existing HashMap
fields). Use ConcurrentHashMap
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly unrelated, but why doesn't perTopicRpcLatencies
use a gauge or sum as the value type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would the sum represent? the sum of latencies? but each individual one is important, and a sum would lose information.
A gauge isn't quite clear either, if you have two concurrent rpcs that completed, what value do you return?
A histogram of values provides more information (and allows us to see the spread of values)
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
Outdated
Show resolved
Hide resolved
@@ -743,6 +747,16 @@ private void reportBacklog() { | |||
backlogElementsOfSplit.set(splitBacklogMessages); | |||
} | |||
|
|||
private void reportBacklogMetrics() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this can be merged with reportBacklog
(potentially rename that method to reportBacklogMetrics
updateBacklogMetrics
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I explicitly moved it out to be separate, since reportBacklog() is called twice, and we only need to do this once (when we advance to the next record).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
f33b235
to
925290b
Compare
Run Java_IOs_Direct PreCommit |
5baabd8
to
11137f9
Compare
Run Java PreCommit |
11137f9
to
099ab06
Compare
Run Java_Hadoop_IO_Direct PreCommit |
Run Java PreCommit |
7e1ad4c
to
70a5f63
Compare
Run Java PreCommit |
70a5f63
to
949f3f5
Compare
Run Java_IOs_Direct PreCommit |
Run Java PreCommit |
949f3f5
to
fc42673
Compare
* add counter stuff * Address John's comments about separting conversion and validation checks * address Steven's comments * another round of comments --------- Co-authored-by: Naireen <[email protected]>
Add per worker gauge support to add per backlog partition for kafka with java legacy worker for Dataflow
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.