-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[OTE-846] Bazooka sequential clear #2423
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -264,14 +264,17 @@ async function partitionKafkaTopics(): Promise<void> { | |
async function clearKafkaTopics( | ||
existingKafkaTopics: string[], | ||
): Promise<void> { | ||
await Promise.all( | ||
_.map(KAFKA_TOPICS, | ||
clearKafkaTopic.bind(null, | ||
1, | ||
config.CLEAR_KAFKA_TOPIC_RETRY_MS, | ||
config.CLEAR_KAFKA_TOPIC_MAX_RETRIES, | ||
existingKafkaTopics)), | ||
); | ||
// Concurrent calls to clear all topics caused the failure: | ||
// TypeError: Cannot destructure property 'partitions' of 'high.pop(...)' as it is undefined. | ||
for (const topic of KAFKA_TOPICS) { | ||
await clearKafkaTopic( | ||
1, | ||
config.CLEAR_KAFKA_TOPIC_RETRY_MS, | ||
config.CLEAR_KAFKA_TOPIC_MAX_RETRIES, | ||
existingKafkaTopics, | ||
topic, | ||
); | ||
} | ||
Comment on lines
+269
to
+277
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Potential Performance Impact of Sequential Topic Clearing Changing from concurrent to sequential clearing of Kafka topics resolves the You can use a concurrency control utility like +import pLimit from 'p-limit';
async function clearKafkaTopics(
existingKafkaTopics: string[],
): Promise<void> {
+ const limit = pLimit(5); // Adjust the concurrency level as appropriate
+
// Concurrent calls to clear all topics caused the failure:
// TypeError: Cannot destructure property 'partitions' of 'high.pop(...)' as it is undefined.
- for (const topic of KAFKA_TOPICS) {
- await clearKafkaTopic(
- 1,
- config.CLEAR_KAFKA_TOPIC_RETRY_MS,
- config.CLEAR_KAFKA_TOPIC_MAX_RETRIES,
- existingKafkaTopics,
- topic,
- );
- }
+ const clearTopicPromises = KAFKA_TOPICS.map((topic) =>
+ limit(() =>
+ clearKafkaTopic(
+ 1,
+ config.CLEAR_KAFKA_TOPIC_RETRY_MS,
+ config.CLEAR_KAFKA_TOPIC_MAX_RETRIES,
+ existingKafkaTopics,
+ topic,
+ ),
+ ),
+ );
+ await Promise.all(clearTopicPromises);
} This approach balances performance with reliability by limiting the number of concurrent operations.
|
||
} | ||
|
||
export async function clearKafkaTopic( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistency in Parameter Defaults
In the call to
clearKafkaTopic
, theattempt
parameter is explicitly set to1
, but the functionclearKafkaTopic
already defaultsattempt
to1
. Similarly,retryMs
andmaxRetries
default to the same values in the function signature.You can simplify the function call by relying on the default parameter values:
If you need to override the defaults, ensure that the arguments are necessary.
📝 Committable suggestion