Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose reader positions #465

Merged
merged 12 commits into from
Feb 23, 2024
Prev Previous commit
Next Next commit
return streamcut instead of position
Signed-off-by: Shwetha N <[email protected]>
  • Loading branch information
ShwethaSNayak committed Feb 15, 2024
commit 9757f2db5ddabaf83304ea4f969321b0820368af
31 changes: 26 additions & 5 deletions src/event/reader_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use snafu::Snafu;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::sync::synchronizer::SynchronizerError;

cfg_if::cfg_if! {
if #[cfg(test)] {
Expand Down Expand Up @@ -136,7 +137,7 @@ impl ReaderGroup {
&client_factory,
init_segments,
)
.await;
.await;
ReaderGroup {
name: name.clone(),
config: rg_config.clone(),
Expand Down Expand Up @@ -218,15 +219,35 @@ impl ReaderGroup {
/// Return the latest positions for the given reader.
/// These positions to be used to construct StreamCutV1
///
pub async fn get_reader_positions(

pub async fn get_reader_streamcut(
&self,
reader_id: String,
) -> Result<HashMap<ScopedSegment, Offset>, ReaderGroupStateError> {
) -> Result<StreamCutV1, ReaderGroupStateError> {
let r: Reader = reader_id.into();
self.state.lock().await.get_reader_positions(&r).await
let positions = self.state.lock().await.get_reader_positions(&r).await;
if let Some((seg, offset)) = positions.unwrap().iter().next() {
let scoped_stream = seg.get_scoped_stream();
let mut scoped_segment_map: HashMap<ScopedSegment, i64> = HashMap::new();
scoped_segment_map.insert(seg.clone(), offset.read);
// Return the StreamCutV1 object
Ok(StreamCutV1::new(scoped_stream, scoped_segment_map))
}else {
//Here only possible error thrown will be readr_offline
// Other error like deserialize position are not thrown back
Err(ReaderGroupStateError::ReaderAlreadyOfflineError {
error_msg: format!("Reader already marked offline {:?}", r),
source: SynchronizerError::SyncPreconditionError {
error_msg: String::from("Precondition failure"),
},
})
}


}
}


/// Specifies the ReaderGroupConfig.
/// ReaderGroupConfig::default() ensures the group refresh interval is set to 3 seconds.
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
Expand Down Expand Up @@ -635,4 +656,4 @@ mod tests {
fn test_reader_group_config_builder_invalid() {
let _rg_config = ReaderGroupConfigBuilder::default().build();
}
}
}
Loading