-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dekaf: Implement capturing and appending stats #1923
Closed
Closed
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
5 tasks
Temporarily switching base to master and marking this as non-draft so CI will run and I can get a dev build to test on |
716b510
to
df728d0
Compare
…s using `/authorize/dekaf` and `/authorize/task` Also add a hint for passing a collection name as a topic name, when the binding has renamed that topic
* Connector projections: emit recommended constraints for all fields -- you get everything by default, and you can modify the selection however you like * Schema: Build a schema from the materialization's built spec's `field_selection` and the collection's projections that will match the extracted documents * Extraction: Implement field extraction using the `extractors` crate to emit documents that match the "value schema"
c53899d
to
4b31d7c
Compare
… a Session and write them to the correct ops logs journal Also support filtering logs by the requested shard log level
Then implement some tests to validate field selection logic
Part of dekaf: Improvements to handle higher scale #1876, we want to implement broker fallback so Dekaf can connect to any of the brokers in the cluster if one doesn't respond. An improvement here would be to periodically fetch the metadata from at least one of the responding brokers and update this list of addresses so that future sessions can know about/use any newly created members of the cluster. I don't anticipate changing the topology of our cluster that frequently, and if we do then updating Dekaf's deployment configs isn't that big of a deal. I may eat my hat on this, we'll see. In addition, we want to move people over to the new MSK cluster, so this implements routing new-style connections to a separate cluster with separate credentials.
A couple things to note: * I originally tried to create a single `journal::Client` responsible for appending both logs and stats, but I ended up realizing that `/authorize/task` only allows authoring a token for a single task/prefix at a time. So I took the simpler route of creating two clients, rather than teaching `/authorize/task` how to handle multiple tasks, which has some fairly delicate requirements. * As it turns out, the stats rollups assume the presence of a `shard` field on both logs and stats. So I ended up needing to craft a `ShardRef` that just contains the Dekaf materialization name, and attach it to both the logs and stats documents that get emitted.
I noticed that after roughly 1-2 hours, Dekaf would stop writing logs and stats. I tracked that down to an error appending logs, specifically: ``` Grpc( Status { code: DeadlineExceeded, message: "context deadline exceeded" } ) ``` It turns out that this is the error Gazette returns when the auth token you pass it is expired, and the appending machinery in Dekaf wasn't taking into account token expiry. So this commit refactors `GazetteWriter` to be composed of two `GazetteAppender`s, one for logs and one for stats. Each `GazetteAppender` is capable of refreshing its internal client when neccesary
4b31d7c
to
88e456f
Compare
Rolled into #1840 after all |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description:
A couple things to note:
journal::Client
responsible for appending both logs and stats, but I ended up realizing that/authorize/task
only allows authoring a token for a single task/prefix at a time. So I took the simpler route of creating two clients, rather than teaching/authorize/task
how to handle multiple tasks, which has some fairly delicate requirements.shard
field on both logs and stats. So I ended up needing to craft aShardRef
that just contains the Dekaf materialization name, and attach it to both the logs and stats documents that get emitted.This change is![Reviewable](https://camo.githubusercontent.com/1541c4039185914e83657d3683ec25920c672c6c5c7ab4240ee7bff601adec0b/68747470733a2f2f72657669657761626c652e696f2f7265766965775f627574746f6e2e737667)