Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 225: Implement list streams controller API #242

Merged
merged 16 commits into from
Apr 26, 2021
Prev Previous commit
Next Next commit
Refactor API based on feedback.
Signed-off-by: Sandeep <[email protected]>
  • Loading branch information
shrids committed Apr 22, 2021
commit d3186ebf9fa2f56a88686354dbea83b018eb1177
40 changes: 27 additions & 13 deletions controller-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ pub trait ControllerClient: Send + Sync {
* API to list streams under a given scope and continuation token.
* Use the pravega_controller_client::paginator::list_streams to paginate over all the streams.
*/
async fn list_streams(&self, scope: &Scope, token: &str) -> ResultRetry<Option<(Vec<String>, String)>>;
async fn list_streams(
&self,
scope: &Scope,
token: &CToken,
) -> ResultRetry<Option<(Vec<ScopedStream>, CToken)>>;

/**
* API to delete a scope. Note that a scope can only be deleted in the case is it empty. If
Expand Down Expand Up @@ -331,7 +335,11 @@ impl ControllerClient for ControllerClientImpl {
)
}

async fn list_streams(&self, scope: &Scope, token: &str) -> ResultRetry<Option<(Vec<String>, String)>> {
async fn list_streams(
&self,
scope: &Scope,
token: &CToken,
) -> ResultRetry<Option<(Vec<ScopedStream>, CToken)>> {
wrap_with_async_retry!(
self.config.retry_policy.max_tries(MAX_RETRIES),
self.call_list_streams(scope, token)
Expand Down Expand Up @@ -612,13 +620,15 @@ impl ControllerClientImpl {
})
}

async fn call_list_streams(&self, scope: &Scope, token: &str) -> Result<Option<(Vec<String>, String)>> {
async fn call_list_streams(
&self,
scope: &Scope,
token: &CToken,
) -> Result<Option<(Vec<ScopedStream>, CToken)>> {
let operation_name = "ListStreams";
let request: StreamsInScopeRequest = StreamsInScopeRequest {
scope: Some(ScopeInfo::from(scope)),
continuation_token: Some(ContinuationToken {
token: token.to_string(),
}),
continuation_token: Some(ContinuationToken::from(token)),
};
debug!(
"Triggering a request to the controller to list streams for scope {}",
Expand All @@ -630,15 +640,13 @@ impl ControllerClientImpl {
match op_status {
Ok(streams_with_token) => {
let result = streams_with_token.into_inner();
let t: Vec<StreamInfo> = result.streams;
let mut t: Vec<StreamInfo> = result.streams;
if t.is_empty() {
// Empty result from the controller implies no further streams present.
Ok(None)
} else {
// update state with the new set of streams.
let stream_list: Vec<String> =
t.iter().map(|i: &StreamInfo| i.stream.to_string()).collect();

let stream_list: Vec<ScopedStream> = t.drain(..).map(|i| i.into()).collect();
let token: Option<ContinuationToken> = result.continuation_token;
let ct: String = match token.map(|t| t.token) {
None => {
Expand All @@ -653,7 +661,7 @@ impl ControllerClientImpl {
t
}
};
Ok(Some((stream_list, ct)))
Ok(Some((stream_list, CToken::from(ct.as_str()))))
}
}
Err(status) => {
Expand Down Expand Up @@ -1455,9 +1463,15 @@ mod test {

// test list streams
let res = rt
.block_on(controller.list_streams(&scope, &String::from("")))
.block_on(controller.list_streams(&scope, &CToken::empty()))
.expect("list streams");
assert_eq!(res, Some((vec!["s1".to_string()], String::from("123"))))
let expected_stream = ScopedStream {
scope,
stream: Stream {
name: String::from("s1"),
},
};
assert_eq!(res, Some((vec![expected_stream], CToken::from("123"))))
}

#[derive(Default)]
Expand Down
8 changes: 4 additions & 4 deletions controller-client/src/mock_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ impl ControllerClient for MockController {
async fn list_streams(
&self,
scope: &Scope,
_token: &str,
) -> Result<Option<(Vec<String>, String)>, RetryError<ControllerError>> {
_token: &CToken,
) -> Result<Option<(Vec<ScopedStream>, CToken)>, RetryError<ControllerError>> {
let map_guard = self.created_scopes.read().await;
let streams_set = map_guard.get(&scope.name).ok_or(RetryError {
error: ControllerError::OperationError {
Expand All @@ -92,9 +92,9 @@ impl ControllerClient for MockController {
})?;
let mut result = Vec::new();
for stream in streams_set {
result.push(stream.stream.name.clone())
result.push(stream.clone())
}
Ok(Some((result, String::from(""))))
Ok(Some((result, CToken::from("mock_token"))))
}

async fn delete_scope(&self, scope: &Scope) -> Result<bool, RetryError<ControllerError>> {
Expand Down
17 changes: 17 additions & 0 deletions controller-client/src/model_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,23 @@ impl<'a> From<&'a ScopedStream> for StreamInfo {
}
}

impl From<StreamInfo> for ScopedStream {
fn from(value: StreamInfo) -> ScopedStream {
ScopedStream {
scope: Scope::from(value.scope),
stream: Stream::from(value.stream),
}
}
}

impl<'a> From<&'a CToken> for ContinuationToken {
fn from(t: &'a CToken) -> ContinuationToken {
ContinuationToken {
token: t.token.to_owned(),
}
}
}

impl<'a> From<&'a Scope> for ScopeInfo {
fn from(value: &'a Scope) -> ScopeInfo {
ScopeInfo {
Expand Down
69 changes: 35 additions & 34 deletions controller-client/src/paginator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{ControllerClient, ControllerError, ResultRetry};
use futures::prelude::*;
use futures::stream::{self};
use pravega_client_retry::retry_result::RetryError;
use pravega_client_shared::Scope;
use pravega_client_shared::{CToken, Scope, ScopedStream};
use std::vec::IntoIter;
use tracing::info;

Expand Down Expand Up @@ -74,48 +74,49 @@ use tracing::info;
pub fn list_streams(
scope: Scope,
client: &dyn ControllerClient,
) -> impl Stream<Item = Result<String, RetryError<ControllerError>>> + '_ {
) -> impl Stream<Item = Result<ScopedStream, RetryError<ControllerError>>> + '_ {
struct State {
streams: IntoIter<String>,
streams: IntoIter<ScopedStream>,
scope: Scope,
token: String,
token: CToken,
}

// Initial state with an empty Continuation token.
let get_next_stream_async = move |mut state: State| async move {
if let Some(element) = state.streams.next() {
Some((Ok(element), state))
} else {
// execute a request to the controller.
info!(
"Fetch the next set of streams under scope {} using the provided token",
state.scope
);
let res: ResultRetry<Option<(Vec<ScopedStream>, CToken)>> =
client.list_streams(&state.scope, &state.token).await;
match res {
Ok(None) => None,
Ok(Some((list, ct))) => {
// create a consuming iterator
let mut stream_iter = list.into_iter();
Some((
Ok(stream_iter.next()?),
State {
streams: stream_iter,
scope: state.scope.clone(),
token: ct,
},
))
}
Err(e) => Some((Err(e), state)),
}
}
};
stream::unfold(
State {
streams: Vec::new().into_iter(),
scope,
token: String::from(""),
},
move |mut state| async move {
if let Some(element) = state.streams.next() {
Some((Ok(element), state))
} else {
// execute a request to the controller.
info!(
"Fetch the next set of streams under scope {} using the provided token",
state.scope
);
let res: ResultRetry<Option<(Vec<String>, String)>> =
client.list_streams(&state.scope, &state.token).await;
match res {
Ok(None) => None,
Ok(Some((list, ct))) => {
// create a consuming iterator
let mut stream_iter = list.into_iter();
Some((
Ok(stream_iter.next()?),
State {
streams: stream_iter,
scope: state.scope.clone(),
token: ct,
},
))
}
Err(e) => Some((Err(e), state)),
}
}
token: CToken::empty(),
},
get_next_stream_async,
)
}
25 changes: 25 additions & 0 deletions shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,31 @@ impl From<&str> for ScopedStream {
}
}

///
/// This represents the continuation token returned by the controller
/// as part of the list streams grpc API.
///
#[derive(new, Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub struct CToken {
pub token: String,
}

impl CToken {
pub fn empty() -> CToken {
CToken {
token: String::from(""),
}
}
}

impl From<&str> for CToken {
fn from(string: &str) -> Self {
CToken {
token: string.to_string(),
}
}
}

#[derive(new, Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub struct ScopedSegment {
pub scope: Scope,
Expand Down