-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[OTE-846] Bazooka sequential clear #2423
Conversation
WalkthroughThe changes involve the Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
4b866ae
to
d67a95d
Compare
This reverts commit b047742.
d67a95d
to
aa17e0b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (1)
indexer/services/bazooka/src/index.ts (1)
267-268
: Clarify the Error Handling in CommentsThe comments explain that concurrent calls caused a
TypeError
, but it's unclear why this error occurs. Providing more context or referencing any relevant issues could help future maintainers understand the root cause.Consider updating the comment:
-// Concurrent calls to clear all topics caused the failure: -// TypeError: Cannot destructure property 'partitions' of 'high.pop(...)' as it is undefined. +// Concurrent calls to `clearKafkaTopic` caused a TypeError due to race conditions +// when accessing topic metadata. Switching to sequential calls prevents this error.
1, | ||
config.CLEAR_KAFKA_TOPIC_RETRY_MS, | ||
config.CLEAR_KAFKA_TOPIC_MAX_RETRIES, | ||
existingKafkaTopics, | ||
topic, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistency in Parameter Defaults
In the call to clearKafkaTopic
, the attempt
parameter is explicitly set to 1
, but the function clearKafkaTopic
already defaults attempt
to 1
. Similarly, retryMs
and maxRetries
default to the same values in the function signature.
You can simplify the function call by relying on the default parameter values:
await clearKafkaTopic(
- 1,
- config.CLEAR_KAFKA_TOPIC_RETRY_MS,
- config.CLEAR_KAFKA_TOPIC_MAX_RETRIES,
existingKafkaTopics,
topic,
);
If you need to override the defaults, ensure that the arguments are necessary.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
1, | |
config.CLEAR_KAFKA_TOPIC_RETRY_MS, | |
config.CLEAR_KAFKA_TOPIC_MAX_RETRIES, | |
existingKafkaTopics, | |
topic, | |
); | |
await clearKafkaTopic( | |
existingKafkaTopics, | |
topic, | |
); |
for (const topic of KAFKA_TOPICS) { | ||
await clearKafkaTopic( | ||
1, | ||
config.CLEAR_KAFKA_TOPIC_RETRY_MS, | ||
config.CLEAR_KAFKA_TOPIC_MAX_RETRIES, | ||
existingKafkaTopics, | ||
topic, | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Potential Performance Impact of Sequential Topic Clearing
Changing from concurrent to sequential clearing of Kafka topics resolves the TypeError
, but it may introduce performance bottlenecks, especially when dealing with a large number of topics. Consider implementing controlled concurrency to improve performance while avoiding the error.
You can use a concurrency control utility like p-limit
to limit the number of concurrent operations. Here's how you might adjust the code:
+import pLimit from 'p-limit';
async function clearKafkaTopics(
existingKafkaTopics: string[],
): Promise<void> {
+ const limit = pLimit(5); // Adjust the concurrency level as appropriate
+
// Concurrent calls to clear all topics caused the failure:
// TypeError: Cannot destructure property 'partitions' of 'high.pop(...)' as it is undefined.
- for (const topic of KAFKA_TOPICS) {
- await clearKafkaTopic(
- 1,
- config.CLEAR_KAFKA_TOPIC_RETRY_MS,
- config.CLEAR_KAFKA_TOPIC_MAX_RETRIES,
- existingKafkaTopics,
- topic,
- );
- }
+ const clearTopicPromises = KAFKA_TOPICS.map((topic) =>
+ limit(() =>
+ clearKafkaTopic(
+ 1,
+ config.CLEAR_KAFKA_TOPIC_RETRY_MS,
+ config.CLEAR_KAFKA_TOPIC_MAX_RETRIES,
+ existingKafkaTopics,
+ topic,
+ ),
+ ),
+ );
+ await Promise.all(clearTopicPromises);
}
This approach balances performance with reliability by limiting the number of concurrent operations.
Committable suggestion was skipped due to low confidence.
@Mergifyio backport release/indexer/v7.x |
✅ Backports have been created
|
(cherry picked from commit b57eb4c)
@Mergifyio release/indexer/v6.x |
❌ Sorry but I didn't understand the command. Please consult the commands documentation 📚. |
https://github.com/Mergifyio backport release/indexer/v6.x |
✅ Backports have been created
|
(cherry picked from commit b57eb4c)
Co-authored-by: jerryfan01234 <[email protected]>
Co-authored-by: jerryfan01234 <[email protected]>
Changelist
[Describe or list the changes made in this PR]
Test Plan
Multiple dev environments and staging were getting this error when running bazooka. Uploaded this image to env and updated bazooka. Ran bazooka successfully.
Author/Reviewer Checklist
state-breaking
label.indexer-postgres-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.Summary by CodeRabbit
Bug Fixes
Documentation