-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
expose store_offset and query_offset in consumer #203
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #203 +/- ##
==========================================
+ Coverage 88.17% 88.56% +0.38%
==========================================
Files 68 68
Lines 5625 5650 +25
==========================================
+ Hits 4960 5004 +44
+ Misses 665 646 -19
☔ View full report in Codecov by Sentry. |
0d376e4
to
52c1278
Compare
5577acf
to
d73ffb3
Compare
d73ffb3
to
0ce70ae
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add the Offset Not Found Error
test case? for the query offset in case it does not exist.
@Gsantomaggio Sure! Done!! |
Can you add a test using a real offset coming form the chunk ? |
Done! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just a minor nit :)
src/consumer.rs
Outdated
pub async fn store_offset(&self, offset: u64) -> Result<(), ConsumerStoreOffsetError> { | ||
if self.name.is_none() { | ||
return Err(ConsumerStoreOffsetError::NameMissing); | ||
} | ||
|
||
let result = self | ||
.internal | ||
.client | ||
.store_offset( | ||
self.name.clone().unwrap().as_str(), | ||
self.internal.stream.as_str(), | ||
offset, | ||
) | ||
.await; | ||
|
||
match result { | ||
Ok(()) => Ok(()), | ||
Err(e) => Err(ConsumerStoreOffsetError::Client(e)), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could make it a little bit shorter like this:
pub async fn store_offset(&self, offset: u64) -> Result<(), ConsumerStoreOffsetError> { | |
if self.name.is_none() { | |
return Err(ConsumerStoreOffsetError::NameMissing); | |
} | |
let result = self | |
.internal | |
.client | |
.store_offset( | |
self.name.clone().unwrap().as_str(), | |
self.internal.stream.as_str(), | |
offset, | |
) | |
.await; | |
match result { | |
Ok(()) => Ok(()), | |
Err(e) => Err(ConsumerStoreOffsetError::Client(e)), | |
} | |
} | |
pub async fn store_offset(&self, offset: u64) -> Result<(), ConsumerStoreOffsetError> { | |
if let Some(name) = &self.name { | |
self.internal | |
.client | |
.store_offset(name, self.internal.stream.as_str(), offset) | |
.await | |
.map(Ok)? | |
} else { | |
Err(ConsumerStoreOffsetError::NameMissing) | |
} | |
} |
same for query_offset
.
@Gsantomaggio and @wolf4ood for the review! I also implemented the last suggestion as having more compact code is better! |
This closes #195
Exposes store_offset and query_offset in Consumer struct