Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 225: Implement list streams controller API #242

Merged
merged 16 commits into from
Apr 26, 2021
Prev Previous commit
Next Next commit
Refactor the implementation.
Signed-off-by: Sandeep <[email protected]>
  • Loading branch information
shrids committed Apr 11, 2021
commit 04e931ced2a0a53814c57f1920dd7c4057f2d9cf
224 changes: 36 additions & 188 deletions controller-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ use controller::{
SegmentRanges, SegmentsAtTime, StreamConfig, StreamInfo, StreamsInScopeRequest, StreamsInScopeResponse,
SuccessorResponse, TxnId, TxnRequest, TxnState, TxnStatus, UpdateStreamStatus,
};
use futures::{
future::{self},
stream, Future, Stream,
};
use futures::{StreamExt, TryStreamExt};
use im::{HashMap as ImHashMap, OrdMap};
use ordered_float::OrderedFloat;
use pravega_client_config::credentials::AUTHORIZATION;
Expand All @@ -57,14 +52,13 @@ use pravega_client_retry::wrap_with_async_retry;
use pravega_client_shared::*;
use snafu::Snafu;
use std::convert::{From, Into};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::RwLock;
use tokio::runtime::Runtime;
use tonic::codegen::http::uri::InvalidUri;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Uri};
use tonic::{metadata::MetadataValue, Code, Request, Status};
use tracing::{debug, info};
use tracing::{debug, info, warn};

#[allow(non_camel_case_types)]
pub mod controller {
Expand Down Expand Up @@ -334,7 +328,7 @@ impl ControllerClient for ControllerClientImpl {
async fn list_streams(&self, scope: &Scope) -> ResultRetry<Vec<String>> {
wrap_with_async_retry!(
self.config.retry_policy.max_tries(MAX_RETRIES),
self.call_list_streamss(scope)
self.call_list_streams(scope)
)
}

Expand Down Expand Up @@ -612,235 +606,89 @@ impl ControllerClientImpl {
})
}

async fn call_list_streamss(&self, scope: &Scope) -> Result<Vec<String>> {
async fn call_list_streams(&self, scope: &Scope) -> Result<Vec<String>> {
use futures::stream::{self, StreamExt};
// Struct used to track the status of state of the results.
struct State {
list: Vec<String>,
token: ContinuationToken,
end_reached: bool,
};
let operation_name = "CreateScope";

let operation_name = "ListStreams";
// Initial state with an empty Continuation token.
let init_state = State {
list: vec![],
token: ContinuationToken {
token: "".to_string(),
},
end_reached: false,
};

let s = stream::unfold(init_state, |mut state| async move {
let stream_result = stream::unfold(init_state, |mut state| async move {
if !state.list.is_empty() {
// return from already fetched list.
let s = state.list.pop().unwrap();
println!("{}", s);
Some((Ok(s), state))
} else if !state.end_reached {
// fetch streams from controller.
// Return from already fetched stream list.
Some((Ok(state.list.pop().unwrap()), state))
} else {
// The list is empty, try fetching it from the controller using the previous continuation token.
let request: StreamsInScopeRequest = StreamsInScopeRequest {
scope: Some(ScopeInfo::from(scope)),
continuation_token: Some(state.token.clone()),
};
println!("Triggering a new request to the controller ...");
debug!(
"Triggering a request to the controller to list streams for scope {}",
scope
);
let op_status: StdResult<tonic::Response<StreamsInScopeResponse>, tonic::Status> =
self.get_controller_client().list_streams_in_scope(request).await;
match op_status {
Ok(streams_with_token) => {
let temp = streams_with_token.into_inner();
let t: Vec<StreamInfo> = temp.streams;
let result = streams_with_token.into_inner();
let t: Vec<StreamInfo> = result.streams;
if t.is_empty() {
state.end_reached = true;
// Empty result from the controller implies no further streams present.
None
} else {
// update state
let r123: Vec<String> =
// update state with the new set of streams.
let stream_list: Vec<String> =
t.iter().map(|i: &StreamInfo| i.stream.to_string()).collect();
state.list.extend_from_slice(r123.as_slice());
// state.list.extend(&r123);
let token: Option<ContinuationToken> = temp.continuation_token;
state.list.extend_from_slice(stream_list.as_slice());
let token: Option<ContinuationToken> = result.continuation_token;
match token {
None => {
println!("None returned for continuation token");
warn!(
"None returned for continuation token list streams API for scope {}",
scope
);
}
Some(ct) => {
println!("Returned token {}", ct.token);
debug!(
"Returned token {} for list streams API under scope {}",
ct.token, scope
);
state.token = ct
}
};
Some((Ok(state.list.pop().unwrap()), state))
}
}
Err(status) => {
println!("Error");
debug!("Error {} while listing streams under scope {}", status, scope);
Some((Err(self.map_grpc_error(operation_name, status).await), state))
}
}
} else {
None
}
});

let r1 = s.collect::<Vec<StdResult<String, ControllerError>>>().await;
let stream_list = stream_result
.collect::<Vec<StdResult<String, ControllerError>>>()
.await;
let (numbers, mut errors): (Vec<Result<String>>, Vec<Result<String>>) =
r1.into_iter().partition(Result::is_ok);
stream_list.into_iter().partition(Result::is_ok);
if errors.is_empty() {
Ok(numbers.into_iter().map(Result::unwrap).collect())
} else {
Err(errors.pop().unwrap().expect_err("Expected Controller Error"))
}
}
async fn call_list_streams(&self, scope: &Scope) -> Result<Vec<String>> {
use futures::stream::{self, StreamExt};
struct State {
list: Vec<String>,
token: ContinuationToken,
end_reached: bool,
};

let operation_name = "CreateScope";

let scope_info: ScopeInfo = ScopeInfo::from(scope);
let empty_token: ContinuationToken = ContinuationToken {
token: "".to_string(),
};
let init_state = State {
list: vec![],
token: ContinuationToken {
token: "".to_string(),
},
end_reached: false,
};

let stream = stream::unfold(init_state, |mut state| async move {
if !state.list.is_empty() {
// return from already fetched list.
let s = state.list.pop().unwrap();
println!("{}", s);
Some((Ok(s), state))
} else if !state.end_reached {
// fetch streams from controller.
let request: StreamsInScopeRequest = StreamsInScopeRequest {
scope: Some(ScopeInfo::from(scope)),
continuation_token: Some(state.token.clone()),
};
println!("Triggering a new request to the controller ...");
let op_status: StdResult<tonic::Response<StreamsInScopeResponse>, tonic::Status> =
self.get_controller_client().list_streams_in_scope(request).await;
match op_status {
Ok(streams_with_token) => {
let temp = streams_with_token.into_inner();
let t: Vec<StreamInfo> = temp.streams;
if t.is_empty() {
state.end_reached = true;
None
} else {
// update state
let r123: Vec<String> =
t.iter().map(|i: &StreamInfo| i.stream.to_string()).collect();
state.list.extend_from_slice(r123.as_slice());
// state.list.extend(&r123);
let token: Option<ContinuationToken> = temp.continuation_token;
match token {
None => {
println!("None returned for continuation token");
}
Some(ct) => {
println!("Returned token {}", ct.token);
state.token = ct
}
};

Some((Ok(state.list.pop().unwrap()), state))
}
}
Err(status) => {
println!("Error");
Some((Err(self.map_grpc_error(operation_name, status).await), state))
}
}
} else {
None
}
});
let r1 = stream.collect::<Vec<Result<String>>>().await;
let request: StreamsInScopeRequest = StreamsInScopeRequest {
scope: Some(scope_info),
continuation_token: Some(empty_token.clone()),
};
let op_status: StdResult<tonic::Response<StreamsInScopeResponse>, tonic::Status> =
self.get_controller_client().list_streams_in_scope(request).await;

let t: Result<Vec<String>> = match op_status {
Ok(streams_with_token) => {
let temp = streams_with_token.into_inner();
let t: Vec<StreamInfo> = temp.streams;
let r: Vec<String> = t.iter().map(|i| i.stream.to_string()).collect();
let token: Option<ContinuationToken> = temp.continuation_token;
match token {
None => println!("None returned for continuation token"),
Some(ct) => println!("Returned token {}", ct.token),
};

Ok(r)
}
Err(status) => Err(self.map_grpc_error(operation_name, status).await),
};
t
// self.get_controller_client().list_streams_in_scope(tonic::Reques::new )

// let stream = stream::unfold(listStream, |state| async move {
// if state.list.is_empty() {
// let next_state = state + 1;
// let yielded = listStream.list.pop().unwrap();
// Some((yielded, next_state))
// } else {
// self.get_controller_client().list_stream
// }
// });
}

async fn call_list_streams2(&self, scope: &Scope) -> Result<Vec<String>> {
let operation_name = "CreateScope";

let scope_info: ScopeInfo = ScopeInfo::from(scope);
let empty_token: ContinuationToken = ContinuationToken {
token: "".to_string(),
};

let request: StreamsInScopeRequest = StreamsInScopeRequest {
scope: Some(scope_info),
continuation_token: Some(empty_token),
};
let op_status: StdResult<tonic::Response<StreamsInScopeResponse>, tonic::Status> =
self.get_controller_client().list_streams_in_scope(request).await;

let t: Result<Vec<String>> = match op_status {
Ok(streams_with_token) => {
let temp = streams_with_token.into_inner();
let t: Vec<StreamInfo> = temp.streams;
let r: Vec<String> = t.iter().map(|i| i.stream.to_string()).collect();
let token: Option<ContinuationToken> = temp.continuation_token;
match token {
None => println!("None returned for continuation token"),
Some(ct) => println!("Returned token {}", ct.token),
};

Ok(r)
}
Err(status) => Err(self.map_grpc_error(operation_name, status).await),
};
t
// self.get_controller_client().list_streams_in_scope(tonic::Reques::new )

// let stream = stream::unfold(listStream, |state| async move {
// if state.list.is_empty() {
// let next_state = state + 1;
// let yielded = listStream.list.pop().unwrap();
// Some((yielded, next_state))
// } else {
// self.get_controller_client().list_stream
// }
// });
}

async fn call_create_scope(&self, scope: &Scope) -> Result<bool> {
use create_scope_status::Status;
Expand Down