-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Commit 4cbae11
fix(kafka source): fix acknowledgement handling during shutdown and rebalance events (#17497)
* test(kafka source): integration tests for acknowledgement handling during shutdown and rebalance events
When running locally, this is most easily tested using an existing kafka
topic pre-populated with, say 100k messages:
❯ kcat -b $BROKER_ADDR -P -t TestTopic -K : -l <(for i in $(seq 1 100000); do echo "${i}:{\"value\": ${i}}"; done);
...then running the tests, targeting that topic through environment variables. This can be a bit finicky with regard to timings,
so KAFKA_SHUTDOWN_DELAY controls how long to run the first consumer before shutting down (drain at shutdown test), and
KAFKA_CONSUMER_DELAY controls the time between starting new consumers during the rebalancing test.
❯ KAFKA_SEND_COUNT=0 \
KAFKA_EXPECT_COUNT=100000 \
KAFKA_TEST_TOPIC=TestTopic \
KAFKA_CONSUMER_DELAY=5000 \
KAFKA_SHUTDOWN_DELAY=5000 \
KAFKA_HOST=$BROKER_ADDR \
KAFKA_PORT=9092 \
cargo test --lib --no-default-features -F sources-kafka -F kafka-integration-tests drains_acknowledgement
* fix(kafka source): drain pending acknowledgements on shutdown and rebalance
* fix(kafka source): performance improvements for acknowledgement handling on many partitions
Instead of tokio StreamMap, which gets very slow when more than a handful of partitions are involved,
use a task and forwarding channel for each partition. Introduces a
little bookkeeping, but scales well to at least hundreds of
partitions
* clippy fixes, and remove unnecessary last_offset tracking
* cargo fmt again
* fmt
* clean up handle_messages loop and add a tracing span for metrics collection
* fixup changes lost after merging master
* clippy warning
* enhancement(kafka source): kafka source uses a dedicated task per partition to consume & acknowledge messages
* make the spelling checker happy, maybe?
* emit a debug log instead of panicking if a shutdown happens during a consumer rebalance
* improved partition eof handling
* add OptionFuture to drain deadline and EOF handling, and use is_subset to detect when all expected partitions are finished draining
* replace OnceCell with OnceLock
* cargo fmt
* create clear distinction between consuming and draining states
* add "complete" as a terminal state, and "keep_consuming", "keep_draining", and "complete" methods for descriptive state (non)transitions
* use state transition methods consistently for all state transitions
* slightly clearer assertion messages about what is expected
* update obsolete comment, make coordinator loop condition explicit
* use keep_consuming from the drain_timeout while consuming handler
* rely solely on adding/removing entries in expect_drain to detect when draining is complete
* fix comment :P
* clippy/fmt fixes
* minor cleanup: during shutdown, use is_drain_complete to detect the already-finished state
* integration test uses `FuturesUnordered` for better performance
Co-authored-by: Doug Smith <[email protected]>
* use FuturesUnordered
* use 6 partitions for integration test
* integration test using 125k messages
* add drain_timeout_ms option for kafka source
* enforce drain_timeout_ms < session_timeout_ms when building kafka source
* generate component docs
* use Option::{unzip, map_or} methods
Co-authored-by: Bruce Guenter <[email protected]>
* remove OnceLock on callback channel sender, and other review cleanups
- avoid panic in case split_partition_queue returns None
- remove `pub` markers that are not needed
- add comments around drain coordination signalling
- cargo fmt
* members of Keys struct are cloned once per consumed partition, instead of entire config object
* cargo fmt and fix clippy warnings
---------
Co-authored-by: Doug Smith <[email protected]>
Co-authored-by: Bruce Guenter <[email protected]>1 parent 67c4beb commit 4cbae11Copy full SHA for 4cbae11
0 commit comments