Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Adds a caching to shard management counts to alleviate shard scanning O(n) #530

Closed
wants to merge 459 commits into from

Conversation

astubbs
Copy link
Contributor

@astubbs astubbs commented Dec 21, 2022

Description...

Checklist

  • Documentation (if applicable)
  • Changelog
  • Final speed comparison to origin

astubbs and others added 30 commits July 23, 2021 16:01
Fixes confluentinc#60 - upgrade AK to 2.7.0.

Adds 2.7.0 as a stable build.
This messes with the shutdown process of the Producer, i.e. committing final transactions, offsets etc.
refactor: Extract common Reactor and Vert.x parts
Prevents the extension modules from incorrectly inheriting core methods
that would be broken to use.

Step 1:
new parent: rename
Remove deprecated test class
Base class refactor - removes core api from extension modules
Don't know how this made it through CI.

Missing copyright and updated readme.
… see #maxCurrency

Vert.x concurrency control previously relied on Vert.x WebClient controlling
concurrency setting per host. This breaks things when you use multiple hosts -
no the max concurrency can go beyond the setting. This change migrates to the
new ExternalEngine system - which controls concurrency properly.

Turn off performance comparison unit test - too brittle for CI.
Under unrealistically high load with no-op processing, broker poller unblocking a partition could cause ProcessingShard to skip forward in its entries and take work out of order.
Was discovered when fixing a synthetic high performance benchmark, after an O(n) algo was fixed to O(1), creating the state for the race condition to appear. Probably could not happen without the fix, as it's related to the performance of certain parts of the system.
@what-the-diff
Copy link

what-the-diff bot commented Jan 13, 2023

  • Added a new run configuration for performance tests
  • Updated the pom file to include ProgressBar class in bytecode enhancement process
  • Removed log statement from poll method of ParallelEoSStreamProcessor class as it was causing too much logging and slowing down test execution time by ~10% (Encoding back pressure system so that offset payloads are prevented from being too large #47)
  • Fixed bug where interrupting control thread would cause an exception if work mailbox is not being polled currently, which could happen when there are no records available on broker or all partitions have been revoked due to rebalance event (https://github.com/confluentinc/parallel-consumer#bugfix--interrupting-controlthread---exception)
  • & 5b: Changed offset map encoding logic so that partition state can be set into blocked mode once payload size exceeds pressure threshold value but still lower than max allowed metadata size limit; this will prevent further messages from being processed until record success reduces encoded payload size below pressure threshold value again - https://github.com//confluentinc//parallel-consumer/#feature--offsetmapencodingbackpressurethresholds
  • The function workIsWaitingToBeProcessed() was renamed to isWorkWaitingToBeProcessed().
  • A new field totalShardEntriesNotInFlight was added, which keeps track of the number of entries in all shards that are not currently being processed by a consumer thread. This value is updated when an entry is removed from a shard (onSuccess or onFailure) and when it's added back into the retry queue after failing processing for some reason. It's also decremented whenever we get more records out for processing - this way we can keep track of how many unprocessed records there are at any given time without having to iterate over every single record in each shard as before (which could be very expensive).
  • In order to avoid starvation, if partition blocking occurs while getting work from shards, then stop scanning through them immediately instead of continuing until reaching resume point - because even though other partitions may have available messages ready for consumption right now, they will never be able to process those messages due to blocked partitions preventing progress with offset commits etc., so no need wasting CPU cycles checking these non-blocked but still unavailable message repeatedly just because they're next up according to our iteration logic based on last consumed offsets per topic/partition combination..
  • Added logging around why certain WorkContainers aren't taken as work yet: delay hasn't passed; already succeeded; already failed and waiting retry period; or simply isn't allowed since its partition has been revoked temporarily due too much lag behind committed offsets etc.. Also log slow queues separately using rate limiting mechanism so only one warning gets logged every 5 seconds max about same issue(s), otherwise logs would fill up quickly with repeated warnings about same thing happening again and again...
  • The testLargeVolumeInMemory() method was changed to use a different number of records.
  • A new function called getTotalSizeOfAllShards() was added in the WorkManager class, which returns the total size of all shards not currently being processed by any consumer instance (i.e., it is waiting for selection). This value is used as part of an if statement that determines whether or not more work should be downloaded from Kafka and put into memory before processing begins on existing data already in memory.
  • Added a new method to KafkaTestUtils, which allows the user to add records directly into the MockConsumer
  • Updated all tests that use generateRecords() in order for them not to fail due to ordering issues with offsets and partitions (see note on deprecated methods)
  • Fixed some bugs where we were using incorrect types of collections when adding/removing elements from lists or maps - this was causing test failures as well as potential race conditions if used in production code
  • Removed unused imports and variables throughout project files
  • Added @SneakyThrows to JStreamParallelEoSStreamProcessorTest.java
  • Removed @beforeeach from ParallelEoSStreamProcessorTest.java
  • Changed ConcurrentLinkedQueue<>() to new ConcurrentLinkedDeque<>() in OffsetEncodingBackPressureUnitTest and OffsetEncodingBackPressureUnitTest
  • Replaced assertThat with ManagedTruth assertions for better error messages when the test fails (eos-test)
  • Added a new test to check the final rate of progress bar is at least some value.
  • Changed an assertion in WorkManagerTest class from numberOfWorkQueuedInShardsAwaitingSelection() to totalShardEntriesNotInFlight().
  • Enabled debug logging for DynamicLoadFactor and ParallelEoSStreamProcessorTest classes by removing comments before logger statements in logback-test file.

Copy link
Contributor Author

@astubbs astubbs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notes

@@ -72,12 +72,31 @@ void backPressureShouldPreventTooManyMessagesBeingQueuedForProcessing() throws O

var completes = LongStreamEx.of(numberOfRecords).filter(x -> !blockedOffsets.contains(x)).boxed().toList();


{
var totalSizeOfAllShards = wm.getTotalSizeOfAllShards();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Contributor Author

@astubbs astubbs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...

Copy link
Contributor Author

@astubbs astubbs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

…e out of order processing (confluentinc#534)

Under unrealistically high load with no-op processing, broker poller unblocking a partition could cause ProcessingShard to skip forward in its entries and take work out of order.

This was discovered when fixing a synthetic high performance benchmark, after PR#530 (O(n) algo was fixed to O(1)), creating the state for the race condition to appear. Probably could not happen without the fix, as it's related to the performance of certain parts of the system.
…counts

# Conflicts:
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java
#	parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java
@astubbs astubbs changed the title perf: Adds a caching layer to work management to alleviate O(n) counting perf: Adds a caching to shard management counts to alleviate O(n) counting Jan 25, 2023
@astubbs astubbs changed the title perf: Adds a caching to shard management counts to alleviate O(n) counting perf: Adds a caching to shard management counts to alleviate shard scanning O(n) Jan 25, 2023
@cla-assistant
Copy link

cla-assistant bot commented Aug 8, 2023

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 3 committers have signed the CLA.

✅ acktsap
❌ astubbs
❌ nachomdo
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.