Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: streaming delta api #751

Open
wants to merge 30 commits into
base: main
Choose a base branch
from

Conversation

kwasniew
Copy link
Contributor

@kwasniew kwasniew commented Feb 19, 2025

About the changes

This PR adds streaming support for delta events.

Important terminology:

  • SSE event type: unleash-connected and unleash-updated
  • Unleash delta API event type: hydration, feature-updated, feature-removed, segment-updated, segment-removed

On the wire it looks like this:

event: unleash-connected
data: {...events array with hydration event...}

or

event: unleash-updated
data: {... events array with feature-update event...}

When SDK connects to edge for the first time it gets unleash-connected event with the hydration event type.
On each subsequent update in Unleash only unleash-updated events are propagated with the changes that the SDK hasn't seen yet. For each connection Edge remembers the last event id that has been broadcasted before.

Important files

Discussion points

Copy link

github-actions bot commented Feb 19, 2025

Dependency Review

✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.

OpenSSF Scorecard

PackageVersionScoreDetails

Scanned Files

@kwasniew kwasniew marked this pull request as draft February 20, 2025 08:26
@kwasniew kwasniew marked this pull request as ready for review February 21, 2025 10:35

if tokio::time::timeout(std::time::Duration::from_secs(2), async {
loop {
if let Some(Ok(event)) = stream.next().await {
let next = stream.next().await;
if let Some(Ok(event)) = next {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do match instead

if tokio::time::timeout(std::time::Duration::from_secs(2), async {
loop {
if let Some(Ok(event)) = stream.next().await {
let next = stream.next().await;
if let Some(Ok(event)) = next {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

match

assert_eq!(initial_features.query, update.query);
assert_eq!(initial_features.version, update.version);
assert_ne!(initial_features.features, update.features);
// TODO: uncomment when we can filter by id
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can uncomment this one now since we do filtering

&self,
query: StreamingQuery,
) -> Result<DeltaEvent, EdgeError> {
// do we need filter_set for hydration event?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do need to filter hydration event features. will be done in another PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

2 participants